You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2013/04/25 20:46:46 UTC
svn commit: r1475899 - in /jena/trunk/jena-arq/src:
main/java/com/hp/hpl/jena/sparql/engine/
main/java/org/apache/jena/atlas/lib/ test/java/com/hp/hpl/jena/sparql/api/
test/java/org/apache/jena/atlas/lib/
Author: andy
Date: Thu Apr 25 18:46:46 2013
New Revision: 1475899
URL: http://svn.apache.org/r1475899
Log:
JENA-440
Correctly set/reset timeouts.
Added:
jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout1.java
- copied, changed from r1469820, jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout.java
jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout2.java
Removed:
jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout.java
Modified:
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java
jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/AlarmClock.java
jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/Pingback.java
jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TS_API.java
jena/trunk/jena-arq/src/test/java/org/apache/jena/atlas/lib/TestAlarmClock.java
Modified: jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java?rev=1475899&r1=1475898&r2=1475899&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java (original)
+++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java Thu Apr 25 18:46:46 2013
@@ -25,8 +25,6 @@ import java.util.Set ;
import java.util.concurrent.TimeUnit ;
import org.apache.jena.atlas.lib.AlarmClock ;
-import org.apache.jena.atlas.lib.Callback ;
-import org.apache.jena.atlas.lib.Pingback ;
import org.apache.jena.atlas.logging.Log ;
import com.hp.hpl.jena.graph.Node ;
@@ -61,19 +59,32 @@ public class QueryExecutionBase implemen
// Initial bindings.
// Split : QueryExecutionGraph already has the dataset.
- private Query query ;
- private Dataset dataset ;
- private QueryEngineFactory qeFactory ;
- private QueryIterator queryIterator = null ;
- private Plan plan = null ;
- private Context context ;
- private FileManager fileManager = FileManager.get() ;
- private QuerySolution initialBinding = null ;
- private Object lockTimeout = new Object() ; // syncrhonization.
+ private Query query ;
+ private Dataset dataset ;
+ private QueryEngineFactory qeFactory ;
+ private QueryIterator queryIterator = null ;
+ private Plan plan = null ;
+ private Context context ;
+ private FileManager fileManager = FileManager.get() ;
+ private QuerySolution initialBinding = null ;
+
+ // Set if QueryIterator.cancel has been called
+ private volatile boolean isCancelled = false ;
+ private volatile TimeoutCallback expectedCallback = null ;
+ private TimeoutCallback timeout1Callback = null ;
+ private TimeoutCallback timeout2Callback = null ;
+
+ private final Object lockTimeout = new Object() ; // synchronization.
+ private static final long TIMEOUT_UNSET = -1 ;
+ private static final long TIMEOUT_INF = -2 ;
+ private static boolean isTimeoutSet(long x)
+ {
+ return x >= 0 ;
+ }
+ private long timeout1 = TIMEOUT_UNSET ;
+ private long timeout2 = TIMEOUT_UNSET ;
+ private final AlarmClock alarmClock = AlarmClock.get() ;
- // has cancel() been called?
- private volatile boolean cancel = false ;
-
public QueryExecutionBase(Query query,
Dataset dataset,
Context context,
@@ -92,6 +103,10 @@ public class QueryExecutionBase implemen
context = Context.setupContext(context, dsg) ;
if ( query != null )
context.put(ARQConstants.sysCurrentQuery, query) ;
+ // NB: Settign timeouts via the context after creating a QueryExecutionBase
+ // will not work.
+ // But we can't move it until the point the execution starts because of
+ // get and set timeout oeprations on this object.
setAnyTimeouts() ;
}
@@ -125,7 +140,6 @@ public class QueryExecutionBase implemen
if ( obj instanceof Number )
{
long x = ((Number)obj).longValue() ;
- //System.err.println("timeout("+x+")") ;
setTimeout(x) ;
} else if ( obj instanceof String )
{
@@ -136,13 +150,11 @@ public class QueryExecutionBase implemen
String[] a = str.split(",") ;
long x1 = Long.parseLong(a[0]) ;
long x2 = Long.parseLong(a[1]) ;
- //System.err.println("timeout("+x1+", "+x2+")") ;
setTimeout(x1, x2) ;
}
else
{
long x = Long.parseLong(str) ;
- //System.err.println("timeout("+x+")") ;
setTimeout(x) ;
}
} catch (RuntimeException ex) { Log.warn(this, "Can't interpret string for timeout: "+obj) ; }
@@ -159,12 +171,15 @@ public class QueryExecutionBase implemen
queryIterator.close() ;
if ( plan != null )
plan.close() ;
- cancelPingback() ;
+ if ( timeout1Callback != null )
+ alarmClock.cancel(timeout1Callback) ;
+ if ( timeout2Callback != null )
+ alarmClock.cancel(timeout2Callback) ;
}
@Override
public void abort()
- {
+ {
synchronized(lockTimeout)
{
// This is called asynchronously to the execution.
@@ -174,9 +189,9 @@ public class QueryExecutionBase implemen
// we notify the chain of iterators, however, we do *not* close the iterators.
// That happens after the cancellation is properly over.
queryIterator.cancel() ;
- cancel = true ;
+ isCancelled = true ;
}
- }
+ }
@Override
public ResultSet execSelect()
@@ -343,9 +358,10 @@ public class QueryExecutionBase implemen
@Override
public void setTimeout(long timeout, TimeUnit timeUnit)
{
+ // Overall timeout - recorded as (UNSET,N)
long x = asMillis(timeout, timeUnit) ;
- this.timeout1 = x ;
- this.timeout2 = TIMEOUT_UNSET ;
+ this.timeout1 = TIMEOUT_UNSET ;
+ this.timeout2 = x ;
}
@Override
@@ -357,11 +373,12 @@ public class QueryExecutionBase implemen
@Override
public void setTimeout(long timeout1, TimeUnit timeUnit1, long timeout2, TimeUnit timeUnit2)
{
+ // Two timeouts.
long x1 = asMillis(timeout1, timeUnit1) ;
long x2 = asMillis(timeout2, timeUnit2) ;
this.timeout1 = x1 ;
if ( timeout2 < 0 )
- this.timeout2 = TIMEOUT_INF ;
+ this.timeout2 = TIMEOUT_UNSET ;
else
this.timeout2 = x2 ;
}
@@ -382,33 +399,20 @@ public class QueryExecutionBase implemen
@Override
public long getTimeout2() { return timeout2 ; }
- private static final long TIMEOUT_UNSET = -1 ;
- private static final long TIMEOUT_INF = -2 ;
- private long timeout1 = TIMEOUT_UNSET ;
- private long timeout2 = TIMEOUT_UNSET ;
-
- private static AlarmClock alarmClock = AlarmClock.get() ;
- private static final Callback<QueryExecution> callback =
- new Callback<QueryExecution>() {
- @Override
- public void proc(QueryExecution qExec)
+ class TimeoutCallback implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ synchronized(lockTimeout)
{
- qExec.abort() ;
+ // Abort query if and only if we are the expected callback.
+ // If the first row has appeared, and we are removing timeout1 callback,
+ // it still may go off so it needs to check here it's still wanted.
+ if ( expectedCallback == this )
+ QueryExecutionBase.this.abort() ;
}
- } ;
-
- private Pingback<QueryExecution> pingback = null ;
-
- private void initTimeout1()
- {
- if ( timeout1 == TIMEOUT_UNSET ) return ;
-
- if ( pingback != null )
- alarmClock.reset(pingback, timeout1) ;
- else
- pingback = alarmClock.add(callback, this, timeout1) ;
- return ;
- // Second timeout done by wrapping the iterator.
+ }
}
private class QueryIteratorTimer2 extends QueryIteratorWrapper
@@ -425,32 +429,33 @@ public class QueryExecutionBase implemen
{
Binding b = super.moveToNextBinding() ;
yieldCount++ ;
+
if ( ! resetDone )
{
- // Synchronize with abort.
+ // Sync on calls of .abort.
+ // So nearly not needed.
synchronized(lockTimeout)
{
- if ( cancel )
+ expectedCallback = timeout2Callback ;
+ // Lock against calls of .abort() nor of timeout1Callback.
+
+ // Update/check the volatiles in a careful order.
+ // This cause timeout1 not to call .abort and hence not set isCancelled
+
+ // But if timeout1 went off after moveToNextBinding, before expectedCallback is set,
+ // then formget the row and cacnel the query.
+ if ( isCancelled )
// timeout1 went off after the binding was yielded but
// before we got here.
throw new QueryCancelledException() ;
-
- // Cancel timeout1?
- if ( pingback == null )
- {
- if ( timeout2 > 0 )
- // Not first timeout - finite second timeout.
- pingback = alarmClock.add(callback, QueryExecutionBase.this, timeout2) ;
- }
- else
- {
- // We have moved for the first time.
- // Reset the timer if finite timeout2 else cancel.
- if ( timeout2 < 0 )
- alarmClock.cancel(pingback) ;
- else
- pingback = alarmClock.reset(pingback, timeout2) ;
- }
+ if ( timeout1Callback != null )
+ alarmClock.cancel(timeout1Callback) ;
+ timeout1Callback = null ;
+
+ // Now arm the second timeout, if any.
+ if ( timeout2 > 0 )
+ // Not first timeout - finite second timeout.
+ alarmClock.add(timeout2Callback, timeout2) ;
resetDone = true ;
}
}
@@ -458,20 +463,6 @@ public class QueryExecutionBase implemen
}
}
- private QueryIterator initTimeout2(QueryIterator queryIterator)
- {
- if ( timeout2 < 0 || timeout2 == TIMEOUT_INF )
- return queryIterator ;
- // Wrap with a resetter.
- return new QueryIteratorTimer2(queryIterator) ;
- }
-
- private void cancelPingback()
- {
- if ( pingback != null )
- alarmClock.cancel(pingback) ;
- }
-
protected void execInit() { }
private ResultSet asResultSet(QueryIterator qIter)
@@ -486,19 +477,60 @@ public class QueryExecutionBase implemen
return rStream ;
}
+ /** Start the query iterator, setting timeouts as needed. */
private void startQueryIterator()
{
execInit() ;
if ( queryIterator != null )
Log.warn(this, "Query iterator has already been started") ;
- initTimeout1() ;
+
+ /* Timeouts:
+ * -1,-1 No timeouts
+ * N, same as -1,N Overall timeout only. No wrapper needed.
+ * N,-1 Timeout on first row only. Need to cancel on first row.
+ * N,M First/overall timeout. Need to reset on first row.
+ */
+
+ if ( ! isTimeoutSet(timeout1) && ! isTimeoutSet(timeout2) )
+ {
+ // Case -1,-1
+ queryIterator = getPlan().iterator() ;
+ return ;
+ }
+
+ if ( ! isTimeoutSet(timeout1) && isTimeoutSet(timeout2) )
+ {
+ // Single overall timeout.
+ timeout2Callback = new TimeoutCallback() ;
+ expectedCallback = timeout2Callback ;
+ alarmClock.add(timeout2Callback, timeout2) ;
+ // Start the query.
+ queryIterator = getPlan().iterator() ;
+ // But don't add resetter.
+ return ;
+ }
+
+ // Case isTimeoutSet(timeout1)
+ // Add timeout to first row.
+ timeout1Callback = new TimeoutCallback() ;
+ alarmClock.add(timeout1Callback, timeout1) ;
+ expectedCallback = timeout1Callback ;
+
// We don't know if getPlan().iterator() does a lot of work or not
// (ideally it shouldn't start executing the query but in some sub-systems
// it might be necessary)
queryIterator = getPlan().iterator() ;
- // Add the second timeout wrapper.
- queryIterator = initTimeout2(queryIterator) ;
- if ( cancel ) queryIterator.cancel() ;
+
+ // Add the timeout resetter wrapper.
+ timeout2Callback = new TimeoutCallback() ;
+ // Wrap with a resetter.
+ queryIterator = new QueryIteratorTimer2(queryIterator) ;
+
+ // Minor optimization - the first call of hasNext() or next() will
+ // throw QueryCancelledExcetion anyway. This just makes it a bit earlier
+ // in the case when the timeout (timoeut1) is so short it's gone off already.
+
+ if ( isCancelled ) queryIterator.cancel() ;
}
private ResultSet execResultSet()
@@ -574,12 +606,10 @@ public class QueryExecutionBase implemen
if ( baseURI == null )
baseURI = IRIResolver.chooseBaseURI() ;
- DatasetGraph dsg =
- DatasetUtils.createDatasetGraph(query.getDatasetDescription(),
- fileManager, baseURI ) ;
+ DatasetGraph dsg = DatasetUtils.createDatasetGraph(query.getDatasetDescription(),
+ fileManager, baseURI ) ;
return dsg ;
}
-
@Override
public void setFileManager(FileManager fm) { fileManager = fm ; }
@@ -589,7 +619,6 @@ public class QueryExecutionBase implemen
{
initialBinding = startSolution ;
}
-
//protected QuerySolution getInputBindings() { return initialBinding ; }
Modified: jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/AlarmClock.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/AlarmClock.java?rev=1475899&r1=1475898&r2=1475899&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/AlarmClock.java (original)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/AlarmClock.java Thu Apr 25 18:46:46 2013
@@ -18,114 +18,52 @@
package org.apache.jena.atlas.lib;
-import java.util.HashSet ;
-import java.util.Set ;
-import java.util.Timer ;
-
-/** An AlarmClock is an object that will make a call back at a preset time.
- * It adds tracking to a java.util.Time and also by having
- * an active Timer (and its thread) only when callbacks are outstanding.
- * The Timer's thread can stop the JVM exiting.
+import java.util.concurrent.ScheduledThreadPoolExecutor ;
+import java.util.concurrent.TimeUnit ;
+
+/** An AlarmClock is an object that will make a callback (with a vaklue) at a preset time.
+ * Simple abstraction of add/reset/cancel of a Runnable.
+ * Currently, backed by {@linkplain ScheduledThreadPoolExecutor}
*/
public class AlarmClock
{
- // ** Switch to ScheduledThreadPoolExecutor
- // Our callback-later instance
- // Wrap a TimerTask so that the TimerTask.cancel operation can not be called
- // directly by the app. We need to go via AlarmClock tracking of callbacks so
- // we can release the Timer in AlarmClock.
+ ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor(1) ;
- public Timer timer = null ;
- public Set<Pingback<?>> outstanding = new HashSet<Pingback<?>>() ;
-
- public AlarmClock() {}
+ /*package*/ AlarmClock() {}
- static private AlarmClock singleton = new AlarmClock() ; ;
+ static private AlarmClock singleton = new AlarmClock() ;
/** Global singleton for general use */
static public AlarmClock get()
{
return singleton ;
}
- synchronized public long getCount() { return outstanding.size() ; }
-
- synchronized public Pingback<?> add(Callback<?> callback, long delay)
- {
- return add(callback, null, delay) ;
- }
-
- synchronized public <T> Pingback<T> add(Callback<T> callback, T argument, long delay)
+ public void add(Runnable task, long delay)
{
- Pingback<T> x = new Pingback<T>(this, callback, argument) ;
- add$(x, delay) ;
- return x ;
- }
-
- private <T> void add$(Pingback<T> pingback, long delay)
- {
- if ( outstanding.contains(pingback) )
- throw new InternalErrorException("Pingback already in use") ;
- getTimer().schedule(pingback.timerTask, delay) ;
- outstanding.add(pingback) ;
+ if ( task == null )
+ throw new IllegalArgumentException("Task is null") ;
+ timer.schedule(task, delay, TimeUnit.MILLISECONDS) ;
}
- synchronized public <T> Pingback<T> reset(Pingback<T> pingback, long delay)
+ public void reset(Runnable task, long delay)
{
- if ( timer != null )
- cancel$(pingback, false) ;
- // Experimentation shows we need to create a new TimerTask.
- pingback = new Pingback<T>(this, pingback.callback, pingback.arg) ;
- add$(pingback, delay) ;
- return pingback ;
+ if ( task == null )
+ throw new IllegalArgumentException("Task is null") ;
+ cancel(task) ;
+ add(task, delay) ;
}
- synchronized public void cancel(Pingback<?> pingback)
- {
- if ( pingback == null )
- return ;
- cancel$(pingback, true) ;
- }
-
- private void cancel$(Pingback<?> pingback, boolean clearTimer)
- {
- if ( timer == null )
- // Nothing outstanding.
- return ;
- // Calls remove$
- pingback.cancel();
- // Throw timer, and it's thread, away if no outstanding pingbacks.
- // This helps apps exit properly but may be troublesome in large systems.
- // May reconsider.
- if ( clearTimer && getCount() == 0 )
- {
- release() ;
- }
- }
-
- /*package*/ void remove$(Pingback<?> pingback)
+ public void cancel(Runnable task)
{
- outstanding.remove(pingback) ;
+ if ( task == null )
+ throw new IllegalArgumentException("Task is null") ;
+ timer.remove(task) ;
}
- public synchronized void release()
- {
- release$() ;
- }
-
- private void release$()
- {
- if ( timer != null )
- {
- timer.cancel() ;
- timer = null ;
- }
- }
+ //public int getCount() { return timer.getQueue().size(); }
- /*synchronized*/ private Timer getTimer()
+ public void release()
{
- if ( timer == null )
- timer = new Timer(true) ;
- return timer ;
+ timer.shutdownNow() ;
}
-}
-
+}
\ No newline at end of file
Modified: jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/Pingback.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/Pingback.java?rev=1475899&r1=1475898&r2=1475899&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/Pingback.java (original)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/Pingback.java Thu Apr 25 18:46:46 2013
@@ -18,41 +18,21 @@
package org.apache.jena.atlas.lib;
-import java.util.TimerTask ;
-
-/** Wrapper around a TimerTask, adding a callback with argument. */
-public class Pingback<T>
+/** Binding of a value to a Callback as a Runnable */
+public class Pingback<T> implements Runnable
{
- private final AlarmClock alarmClock ;
- final TimerTask timerTask ;
- final Callback<T> callback ;
- final T arg ;
- // As good as an AtomicBoolean which is implemented as a volative int for get/set.
- private volatile boolean cancelled = false ;
+ private final T arg ;
+ private final Callback<T> callback ;
- Pingback(final AlarmClock alarmClock, final Callback<T> callback, T argument)
+ private Pingback(Callback<T> callback, T arg)
{
- this.alarmClock = alarmClock ;
+ this.arg = arg ;
this.callback = callback ;
- this.arg = argument ;
- this.timerTask = new TimerTask() {
- @Override
- public void run()
- {
- if ( cancelled )
- return ;
- cancelled = true ;
- alarmClock.remove$(Pingback.this) ;
- callback.proc(arg) ;
- }
- } ;
}
-
- void cancel()
+
+ @Override
+ public void run()
{
- timerTask.cancel() ;
- cancelled = true ;
- alarmClock.remove$(this) ;
+ callback.proc(arg) ;
}
}
-
Modified: jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TS_API.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TS_API.java?rev=1475899&r1=1475898&r2=1475899&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TS_API.java (original)
+++ jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TS_API.java Thu Apr 25 18:46:46 2013
@@ -26,7 +26,8 @@ import org.junit.runners.Suite.SuiteClas
@SuiteClasses( {
TestAPI.class
, TestQueryExecutionCancel.class
- , TestQueryExecutionTimeout.class
+ , TestQueryExecutionTimeout1.class
+ , TestQueryExecutionTimeout2.class
})
public class TS_API
Copied: jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout1.java (from r1469820, jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout.java)
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout1.java?p2=jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout1.java&p1=jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout.java&r1=1469820&r2=1475899&rev=1475899&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout.java (original)
+++ jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout1.java Thu Apr 25 18:46:46 2013
@@ -42,7 +42,7 @@ import com.hp.hpl.jena.sparql.function.F
import com.hp.hpl.jena.sparql.function.library.wait ;
import com.hp.hpl.jena.sparql.sse.SSE ;
-public class TestQueryExecutionTimeout extends BaseTest
+public class TestQueryExecutionTimeout1 extends BaseTest
{
static Graph g = SSE.parseGraph("(graph (<s> <p> <o1>) (<s> <p> <o2>) (<s> <p> <o3>))") ;
static DatasetGraph dsg = DatasetGraphFactory.createOneGraph(g) ;
Added: jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout2.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout2.java?rev=1475899&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout2.java (added)
+++ jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout2.java Thu Apr 25 18:46:46 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.sparql.api;
+
+import static org.apache.jena.atlas.lib.Lib.sleep ;
+import org.junit.Assert ;
+import org.junit.Test ;
+
+import com.hp.hpl.jena.graph.Graph ;
+import com.hp.hpl.jena.query.* ;
+import com.hp.hpl.jena.sparql.core.DatasetGraph ;
+import com.hp.hpl.jena.sparql.core.DatasetGraphFactory ;
+import com.hp.hpl.jena.sparql.engine.binding.Binding ;
+import com.hp.hpl.jena.sparql.sse.SSE ;
+
+public class TestQueryExecutionTimeout2
+{
+ // Testign related to JENA-440
+
+ static private String prefix =
+ "PREFIX f: <http://example/ns#>\n"+
+ "PREFIX afn: <http://jena.hpl.hp.com/ARQ/function#>\n" ;
+ static Graph g = SSE.parseGraph("(graph " +
+ "(<s> <p> 1)" +
+ " (<s> <p> 2)" +
+ " (<s> <p> 3)" +
+ " (<s> <p> 4)" +
+ " (<s> <p> 5)" +
+ " (<s> <p> 6)" +
+ " (<s> <p> 7)" +
+ " (<s> <p> 8)" +
+ " (<s> <p> 9)" +
+ " (<s> <p> 10)" +
+ " (<s> <p> 11)" +
+ " (<s> <p> 12)" +
+ ")") ;
+ static DatasetGraph dsg = DatasetGraphFactory.createOneGraph(g) ;
+ static Dataset ds = DatasetFactory.create(dsg) ;
+
+ private static void noException(ResultSet rs)
+ {
+ ResultSetFormatter.consume(rs) ;
+ }
+
+ private static void exceptionExpected(ResultSet rs)
+ {
+ try { ResultSetFormatter.consume(rs) ; Assert.fail("QueryCancelledException expected") ; } catch (QueryCancelledException ex) {}
+ }
+
+
+ @Test public void timeout_30() { test2(200, 20, 50, true) ; }
+ @Test public void timeout_31() { test2(200, 50, 20, false) ; }
+
+ // Make sure it isn't timeout1 - delay longer than timeout1
+ @Test public void timeout_32() { test2(100, 500, 200, false) ; }
+ @Test public void timeout_33() { test2(150, -1, 200, false) ; }
+
+ @Test public void timeout_34() { test2(10, 40, 100, true) ; }
+
+ @Test public void timeout_35() { test2(-1, 20, 50, true) ; }
+ @Test public void timeout_36() { test2(-1, 50, 20, false) ; }
+
+ @Test public void timeout_37() { test2(200, 200, 50, false) ; }
+ @Test public void timeout_38() { test2(200, -1, 50, false) ; }
+
+
+ private static void test2(long timeout1, long timeout2, int delay, boolean exceptionExpected)
+ {
+ // Enough rows to keep the iterator pipeline full.
+ QueryExecution qExec = QueryExecutionFactory.create(prefix+"SELECT * { ?s ?p ?o }", ds) ;
+ try {
+ qExec.setTimeout(timeout1, timeout2) ;
+ // No rewrite optimizations.
+ // qExec.getContext().set(ARQConstants.sysOptimizerFactory, Optimize.noOptimizationFactory) ;
+ ResultSet rs = qExec.execSelect() ;
+ // ... wait for first binding.
+ Binding b1 = rs.nextBinding() ;
+ //System.err.println(b1) ;
+ // ... then a possible timeout.
+ sleep(delay) ;
+ if ( exceptionExpected )
+ exceptionExpected(rs) ;
+ else
+ noException(rs) ;
+ } finally {
+ qExec.close() ;
+ }
+ }
+}
+
Modified: jena/trunk/jena-arq/src/test/java/org/apache/jena/atlas/lib/TestAlarmClock.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/org/apache/jena/atlas/lib/TestAlarmClock.java?rev=1475899&r1=1475898&r2=1475899&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/test/java/org/apache/jena/atlas/lib/TestAlarmClock.java (original)
+++ jena/trunk/jena-arq/src/test/java/org/apache/jena/atlas/lib/TestAlarmClock.java Thu Apr 25 18:46:46 2013
@@ -23,117 +23,71 @@ import static org.apache.jena.atlas.lib.
import java.util.concurrent.atomic.AtomicInteger ;
import org.apache.jena.atlas.junit.BaseTest ;
-import org.apache.jena.atlas.lib.AlarmClock ;
-import org.apache.jena.atlas.lib.Callback ;
-import org.apache.jena.atlas.lib.Pingback ;
import org.junit.Test ;
public class TestAlarmClock extends BaseTest
{
- static class CallbackTest implements Callback<Object>
- {
- AtomicInteger count = new AtomicInteger(0) ;
-
- public int getCount() { return count.get() ; }
+ AtomicInteger count = new AtomicInteger(0) ;
+ Runnable callback = new Runnable() {
@Override
- public void proc(Object arg)
+ public void run()
{
count.getAndIncrement() ;
- }
- } ;
-
- CallbackTest callback = new CallbackTest() ;
+ }} ;
@Test public void alarm_01()
{
AlarmClock alarmClock = new AlarmClock() ;
- assertEquals(0, alarmClock.getCount()) ;
// Very long - never happens.
- Pingback<?> ping = alarmClock.add(callback, 10000000) ;
- assertEquals(1, alarmClock.getCount()) ;
- alarmClock.cancel(ping) ;
- assertEquals(0, alarmClock.getCount()) ;
- assertEquals(0, callback.getCount()) ;
+ alarmClock.add(callback, 10000000) ;
+ alarmClock.cancel(callback) ;
+ assertEquals(0, count.get()) ;
alarmClock.release() ;
}
@Test public void alarm_02()
{
AlarmClock alarmClock = new AlarmClock() ;
- assertEquals(0, alarmClock.getCount()) ;
// Short - happens.
- Pingback<?> ping = alarmClock.add(callback, 10) ;
+ alarmClock.add(callback, 10) ;
sleep(100) ;
- assertEquals(0, alarmClock.getCount()) ;
- assertEquals(1, callback.getCount()) ;
-
+ assertEquals(1, count.get()) ;
// try to cancel anyway.
- ping.cancel() ;
- assertEquals(0, alarmClock.getCount()) ;
+ alarmClock.cancel(callback) ;
alarmClock.release() ;
}
@Test public void alarm_03()
{
AlarmClock alarmClock = new AlarmClock() ;
- assertEquals(0, alarmClock.getCount()) ;
- Pingback<?> ping1 = alarmClock.add(callback, 100) ;
- Pingback<?> ping2 = alarmClock.add(callback, 100000) ;
- assertEquals(2, alarmClock.getCount()) ;
- sleep(200) ;
+ alarmClock.add(callback, 50) ;
+ alarmClock.add(callback, 100000) ;
+ sleep(100) ;
// ping1 went off.
- assertEquals(1, alarmClock.getCount()) ;
- assertEquals(1, callback.getCount()) ;
- ping1.cancel() ;
-
- assertEquals(1, alarmClock.getCount()) ;
- alarmClock.cancel(ping2) ;
- assertEquals(0, alarmClock.getCount()) ;
- assertEquals(1, callback.getCount()) ;
+ assertEquals(1, count.get()) ;
+ alarmClock.cancel(callback) ;
alarmClock.release() ;
}
@Test public void alarm_04()
{
AlarmClock alarmClock = new AlarmClock() ;
- assertEquals(0, alarmClock.getCount()) ;
- Pingback<?> ping1 = alarmClock.add(callback, 10) ;
- Pingback<?> ping2 = alarmClock.add(callback, 20) ;
- assertEquals(2, alarmClock.getCount()) ;
+ alarmClock.add(callback, 10) ;
+ alarmClock.add(callback, 20) ;
sleep(200) ;
// ping1 went off. ping2 went off.
- assertEquals(0, alarmClock.getCount()) ;
- assertEquals(2, callback.getCount()) ;
+ assertEquals(2, count.get()) ;
alarmClock.release() ;
}
@Test public void alarm_05()
{
AlarmClock alarmClock = new AlarmClock() ;
- assertEquals(0, alarmClock.getCount()) ;
- Pingback<?> ping1 = alarmClock.add(callback, 100) ;
- assertEquals(1, alarmClock.getCount()) ;
- alarmClock.reset(ping1, 2000) ;
- assertEquals(1, alarmClock.getCount()) ;
- sleep(100) ;
- assertEquals(1, alarmClock.getCount()) ;
+ alarmClock.add(callback, 100) ;
+ alarmClock.reset(callback, 2000) ;
+ sleep(50) ;
+ assertEquals(0, count.get()) ;
alarmClock.release() ;
}
-
- @Test public void alarm_06()
- {
- AlarmClock alarmClock = new AlarmClock() ;
- assertEquals(0, alarmClock.getCount()) ;
- Pingback<?> ping1 = alarmClock.add(callback, 50) ;
- Pingback<?> ping2 = alarmClock.add(callback, 100) ;
- assertEquals(2, alarmClock.getCount()) ;
- alarmClock.reset(ping1, 2000) ;
- assertEquals(2, alarmClock.getCount()) ;
- sleep(200) ; // just ping2 goes off.
- assertEquals(1, callback.getCount()) ;
- assertEquals(1, alarmClock.getCount()) ;
- alarmClock.release() ;
- }
-
}