You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2013/04/25 20:46:46 UTC

svn commit: r1475899 - in /jena/trunk/jena-arq/src: main/java/com/hp/hpl/jena/sparql/engine/ main/java/org/apache/jena/atlas/lib/ test/java/com/hp/hpl/jena/sparql/api/ test/java/org/apache/jena/atlas/lib/

Author: andy
Date: Thu Apr 25 18:46:46 2013
New Revision: 1475899

URL: http://svn.apache.org/r1475899
Log:
JENA-440
Correctly set/reset timeouts.

Added:
    jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout1.java
      - copied, changed from r1469820, jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout.java
    jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout2.java
Removed:
    jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout.java
Modified:
    jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java
    jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/AlarmClock.java
    jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/Pingback.java
    jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TS_API.java
    jena/trunk/jena-arq/src/test/java/org/apache/jena/atlas/lib/TestAlarmClock.java

Modified: jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java?rev=1475899&r1=1475898&r2=1475899&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java (original)
+++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java Thu Apr 25 18:46:46 2013
@@ -25,8 +25,6 @@ import java.util.Set ;
 import java.util.concurrent.TimeUnit ;
 
 import org.apache.jena.atlas.lib.AlarmClock ;
-import org.apache.jena.atlas.lib.Callback ;
-import org.apache.jena.atlas.lib.Pingback ;
 import org.apache.jena.atlas.logging.Log ;
 
 import com.hp.hpl.jena.graph.Node ;
@@ -61,19 +59,32 @@ public class QueryExecutionBase implemen
     // Initial bindings.
     // Split : QueryExecutionGraph already has the dataset.
 
-    private Query              query ;
-    private Dataset            dataset ;
-    private QueryEngineFactory qeFactory ;
-    private QueryIterator      queryIterator = null ;
-    private Plan               plan = null ;
-    private Context            context ;
-    private FileManager        fileManager = FileManager.get() ;
-    private QuerySolution      initialBinding = null ;      
-    private Object             lockTimeout = new Object() ;     // syncrhonization.  
+    private Query               query ;
+    private Dataset             dataset ;
+    private QueryEngineFactory  qeFactory ;
+    private QueryIterator       queryIterator = null ;
+    private Plan                plan = null ;
+    private Context             context ;
+    private FileManager         fileManager = FileManager.get() ;
+    private QuerySolution       initialBinding = null ; 
+    
+    // Set if QueryIterator.cancel has been called 
+    private volatile boolean    isCancelled = false ;
+    private volatile TimeoutCallback expectedCallback = null ;    
+    private TimeoutCallback timeout1Callback = null ;
+    private TimeoutCallback timeout2Callback = null ;
+    
+    private final Object        lockTimeout = new Object() ;     // synchronization.  
+    private static final long   TIMEOUT_UNSET = -1 ;
+    private static final long   TIMEOUT_INF = -2 ;
+    private static boolean isTimeoutSet(long x)
+    { 
+        return x >= 0 ;
+    }
+    private long                timeout1 = TIMEOUT_UNSET ;
+    private long                timeout2 = TIMEOUT_UNSET ;
+    private final AlarmClock    alarmClock = AlarmClock.get() ;  
 
-    // has cancel() been called?
-    private volatile boolean   cancel = false ;
-    
     public QueryExecutionBase(Query query, 
                               Dataset dataset,
                               Context context,
@@ -92,6 +103,10 @@ public class QueryExecutionBase implemen
         context = Context.setupContext(context, dsg) ;
         if ( query != null )
             context.put(ARQConstants.sysCurrentQuery, query) ;
+        // NB: Settign timeouts via the context after creating a QueryExecutionBase 
+        // will not work.
+        // But we can't move it until the point the execution starts because of
+        // get and set timeout oeprations on this object.   
         setAnyTimeouts() ;
     }
     
@@ -125,7 +140,6 @@ public class QueryExecutionBase implemen
             if ( obj instanceof Number )
             {
                 long x = ((Number)obj).longValue() ;
-                //System.err.println("timeout("+x+")") ;
                 setTimeout(x) ;
             } else if ( obj instanceof String )
             {
@@ -136,13 +150,11 @@ public class QueryExecutionBase implemen
                         String[] a = str.split(",") ;
                         long x1 = Long.parseLong(a[0]) ;
                         long x2 = Long.parseLong(a[1]) ;
-                        //System.err.println("timeout("+x1+", "+x2+")") ;
                         setTimeout(x1, x2) ;
                     }
                     else
                     {
                         long x = Long.parseLong(str) ;
-                        //System.err.println("timeout("+x+")") ;
                         setTimeout(x) ;
                     }
                 } catch (RuntimeException ex) { Log.warn(this, "Can't interpret string for timeout: "+obj) ; }
@@ -159,12 +171,15 @@ public class QueryExecutionBase implemen
             queryIterator.close() ;
         if ( plan != null )
             plan.close() ;
-        cancelPingback() ;
+        if ( timeout1Callback != null )
+            alarmClock.cancel(timeout1Callback) ;
+        if ( timeout2Callback != null )
+            alarmClock.cancel(timeout2Callback) ;
     }
 
     @Override
     public void abort()
-	{
+    {
         synchronized(lockTimeout)
         {
             // This is called asynchronously to the execution.
@@ -174,9 +189,9 @@ public class QueryExecutionBase implemen
                 // we notify the chain of iterators, however, we do *not* close the iterators. 
                 // That happens after the cancellation is properly over.
                 queryIterator.cancel() ;
-            cancel = true ;
+            isCancelled = true ;
         }
-	}
+    }
     
     @Override
     public ResultSet execSelect()
@@ -343,9 +358,10 @@ public class QueryExecutionBase implemen
     @Override
     public void setTimeout(long timeout, TimeUnit timeUnit)
     {
+        // Overall timeout - recorded as (UNSET,N)
         long x = asMillis(timeout, timeUnit) ;
-        this.timeout1 = x ;
-        this.timeout2 = TIMEOUT_UNSET ;
+        this.timeout1 = TIMEOUT_UNSET ;
+        this.timeout2 = x ;
     }
 
     @Override
@@ -357,11 +373,12 @@ public class QueryExecutionBase implemen
     @Override
     public void setTimeout(long timeout1, TimeUnit timeUnit1, long timeout2, TimeUnit timeUnit2)
     {
+        // Two timeouts.
         long x1 = asMillis(timeout1, timeUnit1) ;
         long x2 = asMillis(timeout2, timeUnit2) ;
         this.timeout1 = x1 ;
         if ( timeout2 < 0 )
-            this.timeout2 = TIMEOUT_INF ;
+            this.timeout2 = TIMEOUT_UNSET ;
         else
             this.timeout2 = x2 ;
     }
@@ -382,33 +399,20 @@ public class QueryExecutionBase implemen
     @Override
     public long getTimeout2() { return timeout2 ; }
     
-    private static final long TIMEOUT_UNSET = -1 ;
-    private static final long TIMEOUT_INF = -2 ;
-    private long timeout1 = TIMEOUT_UNSET ;
-    private long timeout2 = TIMEOUT_UNSET ;
-    
-    private static AlarmClock alarmClock = AlarmClock.get() ; 
-    private static final Callback<QueryExecution> callback = 
-        new Callback<QueryExecution>() {
-            @Override
-            public void proc(QueryExecution qExec)
+    class TimeoutCallback implements Runnable
+    {
+        @Override
+        public void run()
+        {
+            synchronized(lockTimeout)
             {
-                qExec.abort() ;
+                // Abort query if and only if we are the expected callback.
+                // If the first row has appeared, and we are removing timeout1 callback,
+                // it still may go off so it needs to check here it's still wanted.
+                if ( expectedCallback == this )
+                    QueryExecutionBase.this.abort() ;
             }
-        } ;
-        
-    private Pingback<QueryExecution> pingback = null ;
-    
-    private void initTimeout1()
-    {
-        if ( timeout1 == TIMEOUT_UNSET ) return ;
-        
-        if ( pingback != null )
-            alarmClock.reset(pingback, timeout1) ;
-        else
-            pingback = alarmClock.add(callback, this, timeout1) ;
-        return ;
-        // Second timeout done by wrapping the iterator.
+        }
     }
     
     private class QueryIteratorTimer2 extends QueryIteratorWrapper
@@ -425,32 +429,33 @@ public class QueryExecutionBase implemen
         { 
             Binding b = super.moveToNextBinding() ;
             yieldCount++ ;
+            
             if ( ! resetDone )
             {
-                // Synchronize with abort.
+                // Sync on calls of .abort.
+                // So nearly not needed.
                 synchronized(lockTimeout)
                 {
-                    if ( cancel )
+                    expectedCallback = timeout2Callback ;
+                    // Lock against calls of .abort() nor of timeout1Callback. 
+                    
+                    // Update/check the volatiles in a careful order.
+                    // This cause timeout1 not to call .abort and hence not set isCancelled 
+
+                    // But if timeout1 went off after moveToNextBinding, before expectedCallback is set,
+                    // then formget the row and cacnel the query. 
+                    if ( isCancelled )
                         // timeout1 went off after the binding was yielded but 
                         // before we got here.
                         throw new QueryCancelledException() ;
-                    
-                    // Cancel timeout1?
-                    if ( pingback == null )
-                    {
-                        if ( timeout2 > 0 )
-                            // Not first timeout - finite second timeout. 
-                            pingback = alarmClock.add(callback, QueryExecutionBase.this, timeout2) ;
-                    }
-                    else
-                    {
-                        // We have moved for the first time.
-                        // Reset the timer if finite timeout2 else cancel.
-                        if ( timeout2 < 0 )
-                            alarmClock.cancel(pingback) ;
-                        else
-                            pingback = alarmClock.reset(pingback, timeout2) ;
-                    }
+                    if ( timeout1Callback != null )
+                        alarmClock.cancel(timeout1Callback) ;
+                        timeout1Callback = null ;
+
+                    // Now arm the second timeout, if any.
+                    if ( timeout2 > 0 )
+                        // Not first timeout - finite second timeout. 
+                        alarmClock.add(timeout2Callback, timeout2) ;
                     resetDone = true ;
                 }
             }
@@ -458,20 +463,6 @@ public class QueryExecutionBase implemen
         }
     }
     
-    private QueryIterator initTimeout2(QueryIterator queryIterator)
-    {
-        if ( timeout2 < 0 || timeout2 == TIMEOUT_INF )
-            return queryIterator ;
-        // Wrap with a resetter.
-        return new QueryIteratorTimer2(queryIterator) ;
-    }
-    
-    private void cancelPingback()
-    {
-        if ( pingback != null )
-            alarmClock.cancel(pingback) ;
-    }
-    
     protected void execInit() { }
 
     private ResultSet asResultSet(QueryIterator qIter)
@@ -486,19 +477,60 @@ public class QueryExecutionBase implemen
         return rStream ;
     }
     
+    /** Start the query iterator, setting timeouts as needed. */ 
     private void startQueryIterator()
     {
         execInit() ;
         if ( queryIterator != null )
             Log.warn(this, "Query iterator has already been started") ;
-        initTimeout1() ;
+        
+        /* Timeouts:
+         * -1,-1                No timeouts
+         * N, same as -1,N      Overall timeout only.  No wrapper needed.
+         * N,-1                 Timeout on first row only. Need to cancel on first row. 
+         * N,M                  First/overall timeout. Need to reset on first row.
+         */
+        
+        if ( ! isTimeoutSet(timeout1) && ! isTimeoutSet(timeout2) )
+        {
+            // Case -1,-1
+            queryIterator = getPlan().iterator() ;
+            return ;
+        }
+        
+        if ( ! isTimeoutSet(timeout1) && isTimeoutSet(timeout2) )
+        {
+            // Single overall timeout.
+            timeout2Callback = new TimeoutCallback() ; 
+            expectedCallback = timeout2Callback ; 
+            alarmClock.add(timeout2Callback, timeout2) ;
+            // Start the query.
+            queryIterator = getPlan().iterator() ;
+            // But don't add resetter.
+            return ;
+        }
+
+        // Case isTimeoutSet(timeout1)
+        // Add timeout to first row.
+        timeout1Callback = new TimeoutCallback() ; 
+        alarmClock.add(timeout1Callback, timeout1) ;
+        expectedCallback = timeout1Callback ;
+
         // We don't know if getPlan().iterator() does a lot of work or not
         // (ideally it shouldn't start executing the query but in some sub-systems 
         // it might be necessary)
         queryIterator = getPlan().iterator() ;
-        // Add the second timeout wrapper.
-        queryIterator = initTimeout2(queryIterator) ;
-        if ( cancel ) queryIterator.cancel() ;
+        
+        // Add the timeout resetter wrapper.
+        timeout2Callback = new TimeoutCallback() ; 
+        // Wrap with a resetter.
+        queryIterator = new QueryIteratorTimer2(queryIterator) ;
+
+        // Minor optimization - the first call of hasNext() or next() will
+        // throw QueryCancelledExcetion anyway.  This just makes it a bit earlier
+        // in the case when the timeout (timoeut1) is so short it's gone off already.
+        
+        if ( isCancelled ) queryIterator.cancel() ;
     }
     
     private ResultSet execResultSet()
@@ -574,12 +606,10 @@ public class QueryExecutionBase implemen
         if ( baseURI == null )
             baseURI = IRIResolver.chooseBaseURI() ;
         
-        DatasetGraph dsg =
-            DatasetUtils.createDatasetGraph(query.getDatasetDescription(),
-                                            fileManager, baseURI ) ;
+        DatasetGraph dsg = DatasetUtils.createDatasetGraph(query.getDatasetDescription(),
+                                                           fileManager, baseURI ) ;
         return dsg ;
     }
-
     
     @Override
     public void setFileManager(FileManager fm) { fileManager = fm ; }
@@ -589,7 +619,6 @@ public class QueryExecutionBase implemen
     { 
         initialBinding = startSolution ;
     }
-
     
     //protected QuerySolution getInputBindings() { return initialBinding ; }
 

Modified: jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/AlarmClock.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/AlarmClock.java?rev=1475899&r1=1475898&r2=1475899&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/AlarmClock.java (original)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/AlarmClock.java Thu Apr 25 18:46:46 2013
@@ -18,114 +18,52 @@
 
 package org.apache.jena.atlas.lib;
 
-import java.util.HashSet ;
-import java.util.Set ;
-import java.util.Timer ;
-
-/** An AlarmClock is an object that will make a call back at a preset time.
- * It adds tracking to a java.util.Time and also by having
- * an active Timer (and its thread) only when callbacks are outstanding. 
- * The Timer's thread can stop the JVM exiting.
+import java.util.concurrent.ScheduledThreadPoolExecutor ;
+import java.util.concurrent.TimeUnit ;
+
+/** An AlarmClock is an object that will make a callback (with a vaklue)  at a preset time.
+ * Simple abstraction of add/reset/cancel of a Runnable.
+ * Currently, backed by {@linkplain ScheduledThreadPoolExecutor}
  */
 public class AlarmClock
 {
-    // ** Switch to ScheduledThreadPoolExecutor
-    // Our callback-later instance
-    // Wrap a TimerTask so that the TimerTask.cancel operation can not be called
-    // directly by the app. We need to go via AlarmClock tracking of callbacks so
-    // we can release the Timer in AlarmClock.
+    ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor(1) ;
 
-    public Timer timer = null ;
-    public Set<Pingback<?>> outstanding = new HashSet<Pingback<?>>() ;
-    
-    public AlarmClock() {}
+    /*package*/ AlarmClock() {}
     
-    static private AlarmClock singleton = new AlarmClock() ; ;
+    static private AlarmClock singleton = new AlarmClock() ;
     /** Global singleton for general use */ 
     static public AlarmClock get()
     {
         return singleton ;
     }
 
-    synchronized public long getCount() { return outstanding.size() ; }
-    
-    synchronized public Pingback<?> add(Callback<?> callback, long delay)
-    {
-        return add(callback, null, delay) ;
-    }
-    
-    synchronized public <T> Pingback<T> add(Callback<T> callback, T argument, long delay)
+    public void add(Runnable task, long delay)
     {    
-        Pingback<T> x = new Pingback<T>(this, callback, argument) ;
-        add$(x, delay) ;
-        return x ;
-    }
-    
-    private <T> void add$(Pingback<T> pingback, long delay)
-    {
-        if ( outstanding.contains(pingback) )
-            throw new InternalErrorException("Pingback already in use") ;
-        getTimer().schedule(pingback.timerTask, delay) ;
-        outstanding.add(pingback) ;
+        if ( task == null )
+            throw new IllegalArgumentException("Task is null") ;
+        timer.schedule(task, delay, TimeUnit.MILLISECONDS) ;
     }
 
-    synchronized public <T> Pingback<T> reset(Pingback<T> pingback, long delay)
+    public void reset(Runnable task, long delay)
     {
-        if ( timer != null )
-            cancel$(pingback, false) ;
-        // Experimentation shows we need to create a new TimerTask. 
-        pingback = new Pingback<T>(this, pingback.callback, pingback.arg) ;
-        add$(pingback, delay) ;
-        return pingback ;
+        if ( task == null )
+            throw new IllegalArgumentException("Task is null") ;
+        cancel(task) ;
+        add(task, delay) ;
     }
 
-    synchronized public void cancel(Pingback<?> pingback)
-    {
-        if ( pingback == null )
-            return ;
-        cancel$(pingback, true) ;
-    }
-    
-    private void cancel$(Pingback<?> pingback, boolean clearTimer)
-    {
-        if ( timer == null )
-            // Nothing outstanding.
-            return ;
-        // Calls remove$
-        pingback.cancel();
-        // Throw timer, and it's thread, away if no outstanding pingbacks.
-        // This helps apps exit properly but may be troublesome in large systems.
-        // May reconsider. 
-        if ( clearTimer && getCount() == 0 )
-        {
-            release() ;
-        }
-    }
-    
-    /*package*/ void remove$(Pingback<?> pingback)
+    public void cancel(Runnable task)
     {
-        outstanding.remove(pingback) ;
+        if ( task == null )
+            throw new IllegalArgumentException("Task is null") ;
+        timer.remove(task) ;
     }
     
-    public synchronized void release() 
-    {
-        release$() ;
-    }
-    
-    private void release$()
-    {
-        if ( timer != null )
-        {
-            timer.cancel() ;
-            timer = null ;
-        }
-    }
+    //public int getCount() { return timer.getQueue().size(); }
     
-    /*synchronized*/ private Timer getTimer()
+    public void release()
     {
-        if ( timer == null )
-            timer = new Timer(true) ;
-        return timer ;
+        timer.shutdownNow() ;
     }
-}  
- 
+} 
\ No newline at end of file

Modified: jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/Pingback.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/Pingback.java?rev=1475899&r1=1475898&r2=1475899&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/Pingback.java (original)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/atlas/lib/Pingback.java Thu Apr 25 18:46:46 2013
@@ -18,41 +18,21 @@
 
 package org.apache.jena.atlas.lib;
 
-import java.util.TimerTask ;
-
-/** Wrapper around a TimerTask, adding a callback with argument. */
-public class Pingback<T>
+/** Binding of a value to a Callback as a Runnable */
+public class Pingback<T> implements Runnable
 {
-    private final AlarmClock alarmClock ;
-    final TimerTask timerTask ;
-    final Callback<T> callback ;
-    final T arg ;
-    // As good as an AtomicBoolean which is implemented as a volative int for get/set.
-    private volatile boolean cancelled = false ;
+    private final T arg ;
+    private final Callback<T> callback ; 
 
-    Pingback(final AlarmClock alarmClock, final Callback<T> callback, T argument)
+    private Pingback(Callback<T> callback, T arg)
     {
-        this.alarmClock = alarmClock ;
+        this.arg = arg ;
         this.callback = callback ;
-        this.arg = argument ;
-        this.timerTask = new TimerTask() {
-            @Override
-            public void run()
-            {
-                if ( cancelled )
-                    return ;
-                cancelled = true ;
-                alarmClock.remove$(Pingback.this) ;
-                callback.proc(arg) ;
-            }
-        } ;
     }
-    
-    void cancel()
+
+    @Override
+    public void run()
     {
-        timerTask.cancel() ;
-        cancelled = true ;
-        alarmClock.remove$(this) ;
+        callback.proc(arg) ;
     }
 }
-

Modified: jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TS_API.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TS_API.java?rev=1475899&r1=1475898&r2=1475899&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TS_API.java (original)
+++ jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TS_API.java Thu Apr 25 18:46:46 2013
@@ -26,7 +26,8 @@ import org.junit.runners.Suite.SuiteClas
 @SuiteClasses( {
     TestAPI.class
     , TestQueryExecutionCancel.class
-    , TestQueryExecutionTimeout.class
+    , TestQueryExecutionTimeout1.class
+    , TestQueryExecutionTimeout2.class
 })
 
 public class TS_API

Copied: jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout1.java (from r1469820, jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout.java)
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout1.java?p2=jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout1.java&p1=jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout.java&r1=1469820&r2=1475899&rev=1475899&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout.java (original)
+++ jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout1.java Thu Apr 25 18:46:46 2013
@@ -42,7 +42,7 @@ import com.hp.hpl.jena.sparql.function.F
 import com.hp.hpl.jena.sparql.function.library.wait ;
 import com.hp.hpl.jena.sparql.sse.SSE ;
 
-public class TestQueryExecutionTimeout extends BaseTest
+public class TestQueryExecutionTimeout1 extends BaseTest
 {
     static Graph                g   = SSE.parseGraph("(graph (<s> <p> <o1>) (<s> <p> <o2>) (<s> <p> <o3>))") ;
     static DatasetGraph         dsg = DatasetGraphFactory.createOneGraph(g) ;

Added: jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout2.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout2.java?rev=1475899&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout2.java (added)
+++ jena/trunk/jena-arq/src/test/java/com/hp/hpl/jena/sparql/api/TestQueryExecutionTimeout2.java Thu Apr 25 18:46:46 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.sparql.api;
+
+import static org.apache.jena.atlas.lib.Lib.sleep ;
+import org.junit.Assert ;
+import org.junit.Test ;
+
+import com.hp.hpl.jena.graph.Graph ;
+import com.hp.hpl.jena.query.* ;
+import com.hp.hpl.jena.sparql.core.DatasetGraph ;
+import com.hp.hpl.jena.sparql.core.DatasetGraphFactory ;
+import com.hp.hpl.jena.sparql.engine.binding.Binding ;
+import com.hp.hpl.jena.sparql.sse.SSE ;
+
+public class TestQueryExecutionTimeout2
+{
+    // Testign related to JENA-440
+
+    static private String prefix = 
+        "PREFIX f:       <http://example/ns#>\n"+
+            "PREFIX afn:     <http://jena.hpl.hp.com/ARQ/function#>\n" ;
+    static Graph                g   = SSE.parseGraph("(graph " +
+        "(<s> <p> 1)" +
+        " (<s> <p> 2)" +
+        " (<s> <p> 3)" +
+        " (<s> <p> 4)" +
+        " (<s> <p> 5)" +
+        " (<s> <p> 6)" +
+        " (<s> <p> 7)" +
+        " (<s> <p> 8)" +
+        " (<s> <p> 9)" +
+        " (<s> <p> 10)" +
+        " (<s> <p> 11)" +
+        " (<s> <p> 12)" +
+        ")") ;
+    static DatasetGraph         dsg = DatasetGraphFactory.createOneGraph(g) ;
+    static Dataset              ds  = DatasetFactory.create(dsg) ;
+
+    private static void noException(ResultSet rs)
+    {
+        ResultSetFormatter.consume(rs) ;
+    }
+
+    private static void exceptionExpected(ResultSet rs)
+    {
+        try { ResultSetFormatter.consume(rs) ; Assert.fail("QueryCancelledException expected") ; } catch (QueryCancelledException ex) {}
+    }
+
+
+    @Test public void timeout_30()  { test2(200, 20, 50, true) ; }
+    @Test public void timeout_31()  { test2(200, 50, 20, false) ; }
+
+    // Make sure it isn't timeout1 - delay longer than timeout1
+    @Test public void timeout_32()  { test2(100, 500, 200, false) ; }
+    @Test public void timeout_33()  { test2(150, -1,  200, false) ; }
+
+    @Test public void timeout_34()  { test2(10, 40, 100, true) ; }
+
+    @Test public void timeout_35()  { test2(-1, 20, 50, true) ; }
+    @Test public void timeout_36()  { test2(-1, 50, 20, false) ; }
+
+    @Test public void timeout_37()  { test2(200, 200, 50, false) ; }
+    @Test public void timeout_38()  { test2(200, -1, 50, false) ; }
+
+
+    private static void test2(long timeout1, long timeout2, int delay, boolean exceptionExpected)
+    {
+        // Enough rows to keep the iterator pipeline full.
+        QueryExecution qExec = QueryExecutionFactory.create(prefix+"SELECT * { ?s ?p ?o }", ds) ;
+        try {
+            qExec.setTimeout(timeout1, timeout2) ;
+            // No rewrite optimizations.
+            // qExec.getContext().set(ARQConstants.sysOptimizerFactory, Optimize.noOptimizationFactory) ;
+            ResultSet rs = qExec.execSelect() ;
+            // ... wait for first binding.
+            Binding b1 = rs.nextBinding() ;
+            //System.err.println(b1) ;
+            // ... then a possible timeout.
+            sleep(delay) ;
+            if ( exceptionExpected )
+                exceptionExpected(rs) ;
+            else
+                noException(rs) ;
+        } finally {
+            qExec.close() ;
+        }
+    }
+}
+

Modified: jena/trunk/jena-arq/src/test/java/org/apache/jena/atlas/lib/TestAlarmClock.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/org/apache/jena/atlas/lib/TestAlarmClock.java?rev=1475899&r1=1475898&r2=1475899&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/test/java/org/apache/jena/atlas/lib/TestAlarmClock.java (original)
+++ jena/trunk/jena-arq/src/test/java/org/apache/jena/atlas/lib/TestAlarmClock.java Thu Apr 25 18:46:46 2013
@@ -23,117 +23,71 @@ import static org.apache.jena.atlas.lib.
 import java.util.concurrent.atomic.AtomicInteger ;
 
 import org.apache.jena.atlas.junit.BaseTest ;
-import org.apache.jena.atlas.lib.AlarmClock ;
-import org.apache.jena.atlas.lib.Callback ;
-import org.apache.jena.atlas.lib.Pingback ;
 import org.junit.Test ;
 
 public class TestAlarmClock extends BaseTest
 {
-    static class CallbackTest implements Callback<Object> 
-    {
-        AtomicInteger count = new AtomicInteger(0) ;
-        
-        public int getCount() { return count.get() ; }
+    AtomicInteger count = new AtomicInteger(0) ;
+    Runnable callback = new Runnable() {
         
         @Override
-        public void proc(Object arg)
+        public void run()
         {
             count.getAndIncrement() ;
-        }
-    } ;
-    
-    CallbackTest callback = new CallbackTest() ;
+        }} ;
     
     @Test public void alarm_01()
     {
         AlarmClock alarmClock = new AlarmClock() ;
-        assertEquals(0, alarmClock.getCount()) ;
         // Very long - never happens.
-        Pingback<?> ping = alarmClock.add(callback, 10000000) ;
-        assertEquals(1, alarmClock.getCount()) ;
-        alarmClock.cancel(ping) ;
-        assertEquals(0, alarmClock.getCount()) ;
-        assertEquals(0, callback.getCount()) ;
+        alarmClock.add(callback, 10000000) ;
+        alarmClock.cancel(callback) ;
+        assertEquals(0, count.get()) ;
         alarmClock.release() ;
     }
     
     @Test public void alarm_02()
     {
         AlarmClock alarmClock = new AlarmClock() ;
-        assertEquals(0, alarmClock.getCount()) ;
         // Short - happens.
-        Pingback<?> ping = alarmClock.add(callback, 10) ;
+        alarmClock.add(callback, 10) ;
         sleep(100) ;
-        assertEquals(0, alarmClock.getCount()) ;
-        assertEquals(1, callback.getCount()) ;
-        
+        assertEquals(1, count.get()) ;
         // try to cancel anyway.
-        ping.cancel() ;
-        assertEquals(0, alarmClock.getCount()) ;
+        alarmClock.cancel(callback) ;
         alarmClock.release() ;
     }
 
     @Test public void alarm_03()
     {
         AlarmClock alarmClock = new AlarmClock() ;
-        assertEquals(0, alarmClock.getCount()) ;
-        Pingback<?> ping1 = alarmClock.add(callback, 100) ;
-        Pingback<?> ping2 = alarmClock.add(callback, 100000) ;
-        assertEquals(2, alarmClock.getCount()) ;
-        sleep(200) ;
+        alarmClock.add(callback, 50) ;
+        alarmClock.add(callback, 100000) ;
+        sleep(100) ;
         // ping1 went off.
-        assertEquals(1, alarmClock.getCount()) ;
-        assertEquals(1, callback.getCount()) ;
-        ping1.cancel() ;
-        
-        assertEquals(1, alarmClock.getCount()) ;
-        alarmClock.cancel(ping2) ;
-        assertEquals(0, alarmClock.getCount()) ;
-        assertEquals(1, callback.getCount()) ;
+        assertEquals(1, count.get()) ;
+        alarmClock.cancel(callback) ;
         alarmClock.release() ;
     }
 
     @Test public void alarm_04()
     {
         AlarmClock alarmClock = new AlarmClock() ;
-        assertEquals(0, alarmClock.getCount()) ;
-        Pingback<?> ping1 = alarmClock.add(callback, 10) ;
-        Pingback<?> ping2 = alarmClock.add(callback, 20) ;
-        assertEquals(2, alarmClock.getCount()) ;
+        alarmClock.add(callback, 10) ;
+        alarmClock.add(callback, 20) ;
         sleep(200) ;
         // ping1 went off.  ping2 went off.
-        assertEquals(0, alarmClock.getCount()) ;
-        assertEquals(2, callback.getCount()) ;
+        assertEquals(2, count.get()) ;
         alarmClock.release() ;
     }
 
     @Test public void alarm_05()
     {
         AlarmClock alarmClock = new AlarmClock() ;
-        assertEquals(0, alarmClock.getCount()) ;
-        Pingback<?> ping1 = alarmClock.add(callback, 100) ;
-        assertEquals(1, alarmClock.getCount()) ;
-        alarmClock.reset(ping1, 2000) ;
-        assertEquals(1, alarmClock.getCount()) ;
-        sleep(100) ;
-        assertEquals(1, alarmClock.getCount()) ;
+        alarmClock.add(callback, 100) ;
+        alarmClock.reset(callback, 2000) ;
+        sleep(50) ;
+        assertEquals(0, count.get()) ;
         alarmClock.release() ;
     }
-    
-    @Test public void alarm_06()
-    {
-        AlarmClock alarmClock = new AlarmClock() ;
-        assertEquals(0, alarmClock.getCount()) ;
-        Pingback<?> ping1 = alarmClock.add(callback, 50) ;
-        Pingback<?> ping2 = alarmClock.add(callback, 100) ;
-        assertEquals(2, alarmClock.getCount()) ;
-        alarmClock.reset(ping1, 2000) ;
-        assertEquals(2, alarmClock.getCount()) ;
-        sleep(200) ;    // just ping2 goes off.
-        assertEquals(1, callback.getCount()) ;
-        assertEquals(1, alarmClock.getCount()) ;
-        alarmClock.release() ;
-    }
-    
 }