You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/08/13 14:06:02 UTC

[2/2] jena git commit: ThreadAction - run code in a controlled manner on another thread

ThreadAction - run code in a controlled manner on another thread

ThreadAction runs code in a controlled manner on another thread and 
also passes exceptions back to the iitiatign thread.

Useful for testing as the sequence of actiosn can be carefully
controlled with Semaphores.

Rework ThreadTxn to use ThreadAction.


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/244e0ace
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/244e0ace
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/244e0ace

Branch: refs/heads/master
Commit: 244e0ace70c8b8a6ff128ca61aaf3061b7647992
Parents: b0e4dc3
Author: Andy Seaborne <an...@apache.org>
Authored: Sat Aug 13 15:05:38 2016 +0100
Committer: Andy Seaborne <an...@apache.org>
Committed: Sat Aug 13 15:05:38 2016 +0100

----------------------------------------------------------------------
 .../org/apache/jena/system/ThreadAction.java    | 140 +++++++++++++++++++
 .../java/org/apache/jena/system/ThreadTxn.java  | 134 +++++-------------
 .../java/org/apache/jena/system/TS_System.java  |   1 +
 .../apache/jena/system/TestThreadAction.java    | 140 +++++++++++++++++++
 .../org/apache/jena/system/TestTxnThread.java   |  14 +-
 5 files changed, 324 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/244e0ace/jena-arq/src/main/java/org/apache/jena/system/ThreadAction.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/system/ThreadAction.java b/jena-arq/src/main/java/org/apache/jena/system/ThreadAction.java
new file mode 100644
index 0000000..d086a5c
--- /dev/null
+++ b/jena-arq/src/main/java/org/apache/jena/system/ThreadAction.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.system;
+
+import java.util.Objects ;
+import java.util.concurrent.Executor ;
+import java.util.concurrent.Executors ;
+import java.util.concurrent.Semaphore ;
+import java.util.concurrent.atomic.AtomicReference ;
+
+import org.apache.jena.atlas.logging.Log ;
+
+/**
+ * An action that will happen on a different thread later when {@link #run}
+ * is called. A thread is created and started during a call to the
+ * {#link create()}. The associated Runnable is called when {@link #run}
+ * is called.
+ */ 
+public class ThreadAction {
+    private final Semaphore semaStart   = new Semaphore(0, true) ;
+    private final Semaphore semaFinish  = new Semaphore(0, true) ;
+    
+    // Catch the two kinds that do not need a "throws" clause. 
+    private final AtomicReference<RuntimeException> thrownRuntimeException = new AtomicReference<>(null) ; 
+    private final AtomicReference<Error> thrownError = new AtomicReference<>(null) ;
+    private final Runnable action ;
+    
+    private ThreadAction(Runnable action) {
+        this.action = action ;
+    }
+    
+    /**
+     * Perform the Runnable, reporting any 
+     * {@link java.lang.RuntimeException} or {@link java.lang.Error}
+     */
+    public void run() { 
+        // Signal the thread, which is already running and inside
+        // the transaction, can now call the action.
+        semaStart.release();
+        // Wait for it to finish.
+        semaFinish.acquireUninterruptibly() ;
+        if ( thrownError.get() != null )
+            throw thrownError.get() ;
+        if ( thrownRuntimeException.get() != null )
+            throw thrownRuntimeException.get() ;
+    }
+    
+    // Called on the async thread.
+    private void trigger() {
+        try { action.run(); }
+        catch (Error error) { thrownError.set(error) ; throw error  ;}
+        catch (RuntimeException ex) { thrownRuntimeException.set(ex) ; throw ex ; }
+    }
+    
+    // System-shared executor better.
+    private static Executor executor = Executors.newCachedThreadPool() ;
+
+    /** Create a {@code ThreadAction}.
+     * @param action The main action run when {@link #run()} called.
+     * @return ThreadAction
+     */
+    public static ThreadAction create(Runnable action) {
+        return create(null, action, null) ; 
+    }
+    
+    /** Create a {@code ThreadAction}.  
+     * 
+     * @param before
+     *      Action to call as the thread starts before {@link #run()}.
+     *      Can be null.
+     * @param action 
+     *      The main action run when {@link #run()} called.
+     *      Any exceptions are passed over to {@link #run()} 
+     *      and propagted on the {@link #run()} thread. 
+     * @param after  
+     *      Action to run after the main action.
+     *      Can be null.
+     * @return ThreadAction
+     */
+    public static ThreadAction create(Runnable before, Runnable action, Runnable after) {
+        Objects.requireNonNull(action) ;
+        
+        ThreadAction threadAction = new ThreadAction(action) ;
+        // Startup semaphore so that the thread has started and entered the
+        // transaction by the time we exit this setup function. 
+        Semaphore semaCreateStart = new Semaphore(0, true) ;
+        executor.execute( ()-> {
+            try { 
+                if ( before != null )
+                    before.run();
+            } catch (Throwable th) {
+                Log.warn(ThreadAction.class, "Throwable in 'before' action: "+th.getMessage(), th);
+                semaCreateStart.release() ;
+                threadAction.semaFinish.release() ;
+                return ;
+            }
+            // Signal the creator (see below) that the action has started.
+            semaCreateStart.release() ;
+
+            // Wait for the signal to run the action.
+            threadAction.semaStart.acquireUninterruptibly();
+
+            try {
+                // Perform the action, catching and recording any RuntimeException or Error. 
+                threadAction.trigger() ;
+            }
+            catch (Throwable ex) {
+                // Surpress. trigger() recorded it and it is passed
+                // to the caller in run(). 
+            }
+            try { 
+                if ( after != null )
+                    after.run() ;
+            } catch (Throwable th) {
+                Log.warn(ThreadAction.class, "Throwable in 'after' action: "+th.getMessage(), th);
+                // Drop through.
+            }
+            threadAction.semaFinish.release() ;
+        }) ;
+        // Don't return until the thread has started.
+        semaCreateStart.acquireUninterruptibly();
+        return threadAction ;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/244e0ace/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java b/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java
index 06e735e..4b2fc46 100644
--- a/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java
+++ b/jena-arq/src/main/java/org/apache/jena/system/ThreadTxn.java
@@ -18,21 +18,15 @@
 
 package org.apache.jena.system;
 
-import java.util.Objects ;
-import java.util.concurrent.Executor ;
-import java.util.concurrent.Executors ;
-import java.util.concurrent.Semaphore ;
-import java.util.concurrent.atomic.AtomicReference ;
-
 import org.apache.jena.query.ReadWrite ;
 import org.apache.jena.sparql.core.Transactional ;
 
 /**
- * An action that will happen on a different thread later when {@link #run} is
+ * An action that will happen on a different thread later when {@link ThreadAction#run} is
  * called. A thread is created and the transaction started during a call to the
  * creation operations {@link #threadTxnRead} or {@link #threadTxnWrite}.
  * The associated Runnable is called and the transaction completed when
- * {@link #run} is called. Being on a thread, the state of the world the
+ * {@link ThreadAction#run} is called. Being on a thread, the state of the world the
  * forked transaction sees is outside the creating thread which may itself be in a
  * transaction. Warning: creating a write transaction inside a write transaction
  * will cause deadlock.
@@ -41,111 +35,57 @@ public class ThreadTxn {
     // ---- Thread
 
     /** Create a thread-backed delayed READ transaction action. 
-     * Call {@link ThreadTxn#run} to perform the read transaction.
+     * Call {@link ThreadAction#run} to perform the read transaction.
      */
-    public static ThreadTxn threadTxnRead(Transactional trans, Runnable action) {
-        return ThreadTxn.create(trans, ReadWrite.READ, action, false) ;
+    public static ThreadAction threadTxnRead(Transactional trans, Runnable action) {
+        return create(trans, ReadWrite.READ, action, false) ;
     }
 
     /** Create a thread-backed delayed WRITE action.
-     * Call {@link ThreadTxn#run} to perform the write transaction.
+     * Call {@link ThreadAction#run} to perform the write transaction.
      * (If called from inside a write transaction on the {@code trans},
      * this will deadlock.)
      */
-    public static ThreadTxn threadTxnWrite(Transactional trans, Runnable action) {
-        return ThreadTxn.create(trans, ReadWrite.WRITE, action, true) ;
+    public static ThreadAction threadTxnWrite(Transactional trans, Runnable action) {
+        return create(trans, ReadWrite.WRITE, action, true) ;
     }
    
     /** Create a thread-backed delayed WRITE-abort action (mainly for testing). */
-    public static ThreadTxn threadTxnWriteAbort(Transactional trans, Runnable action) {
-        return ThreadTxn.create(trans, ReadWrite.WRITE, action, false) ;
-    }
-    
-    private final Semaphore semaStart   = new Semaphore(0, true) ;
-    private final Semaphore semaFinish  = new Semaphore(0, true) ;
-    
-    // Catch the two kinds that do not need a "throws" clause. 
-    private final AtomicReference<RuntimeException> thrownRuntimeException = new AtomicReference<>(null) ; 
-    private final AtomicReference<Error> thrownError = new AtomicReference<>(null) ;
-    private final Runnable action ;
-    
-    private ThreadTxn(Runnable action) {
-        this.action = action ;
+    public static ThreadAction threadTxnWriteAbort(Transactional trans, Runnable action) {
+        return create(trans, ReadWrite.WRITE, action, false) ;
     }
-    
-    /**
-     * Perform the Runnable, reporting any 
-     * {@link java.lang.RuntimeException} or {@link java.lang.Error}
-     */
-    public void run() { 
-        // Signal the thread, which is already running and inside
-        // the transaction, can now call the action.
-        semaStart.release();
-        semaFinish.acquireUninterruptibly() ;
-        if ( thrownError.get() != null )
-            throw thrownError.get() ;
-        if ( thrownRuntimeException.get() != null )
-            throw thrownRuntimeException.get() ;
+
+    /*package*/ static ThreadAction create(Transactional trans, ReadWrite mode, Runnable action, boolean isCommit) {
+        return ThreadAction.create
+            ( beforeAction(trans, mode, isCommit)
+            , action
+            , afterAction(trans, mode, isCommit) ) ;
     }
     
-    // Called on the async thread.
-    private void trigger() {
-        try { action.run(); }
-        catch (Error error) { thrownError.set(error) ; throw error  ;}
-        catch (RuntimeException ex) { thrownRuntimeException.set(ex) ; throw ex ; }
+    private static Runnable beforeAction(Transactional trans, ReadWrite mode, boolean isCommit) {
+        return ()-> trans.begin(mode) ;
     }
     
-    // System-shared executor better.
-    private static Executor executor = Executors.newCachedThreadPool() ;
-    
-    /*package*/ static ThreadTxn create(Transactional trans, ReadWrite mode, Runnable action, boolean isCommit) {
-        Objects.requireNonNull(trans) ;
-        Objects.requireNonNull(mode) ;
-        Objects.requireNonNull(action) ;
-        
-        ThreadTxn threadAction = new ThreadTxn(action) ;
-        // Startup semaphore so that the thread has started and entered the
-        // transaction by the time we exit this setup function. 
-        Semaphore semaCreateStart = new Semaphore(0, true) ;
-        executor.execute( ()-> {
-            // NB. trans.begin then semaCreateStartup.release() ;
-            // This ensures that the transaction has really started.
-            trans.begin(mode) ;
-            
-            // Signal the creator (see below) that the transaction has started.
-            semaCreateStart.release() ;
-            
-            // Wait for the signal to run the action.
-            threadAction.semaStart.acquireUninterruptibly();
-            
-            try {
-                // Perform the action, catching and recording any RuntimeException or Error. 
-                threadAction.trigger() ;
-                
-                // Finish transaction (if no throwable)
-                switch (mode) {
-                    case WRITE : {
-                        if ( isCommit )
-                            trans.commit() ;
-                        else
-                            trans.abort() ;
-                        trans.end() ;
-                    }
-                    case READ : {
-                        if ( isCommit )
-                            trans.commit() ;
-                        trans.end() ;
-                    }
+    private static Runnable afterAction(Transactional trans, ReadWrite mode, boolean isCommit) {
+        return () -> {
+            // Finish transaction (if no throwable)
+            switch (mode) {
+                case WRITE : {
+                    if ( isCommit )
+                        trans.commit() ;
+                    else
+                        trans.abort() ;
+                    trans.end() ;
+                    break ;
+                }
+                case READ : {
+                    if ( isCommit )
+                        trans.commit() ;
+                    trans.end() ;
+                    break ;
                 }
             }
-            catch (Throwable ex) {
-                // Surpress. trigger() recorded it and it is passed
-                // to the caller in run(). 
-            }
-            finally { threadAction.semaFinish.release() ; }
-        }) ;
-        // Don't return until the transaction has started.
-        semaCreateStart.acquireUninterruptibly();
-        return threadAction ;
+        } ;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/244e0ace/jena-arq/src/test/java/org/apache/jena/system/TS_System.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/system/TS_System.java b/jena-arq/src/test/java/org/apache/jena/system/TS_System.java
index e8c6f05..103d0b1 100644
--- a/jena-arq/src/test/java/org/apache/jena/system/TS_System.java
+++ b/jena-arq/src/test/java/org/apache/jena/system/TS_System.java
@@ -24,6 +24,7 @@ import org.junit.runners.Suite ;
 @RunWith(Suite.class)
 @Suite.SuiteClasses( {
     TestCounter.class
+    , TestThreadAction.class
     , TestTxnLifecycle.class
     , TestTxn.class
     , TestTxnThread.class

http://git-wip-us.apache.org/repos/asf/jena/blob/244e0ace/jena-arq/src/test/java/org/apache/jena/system/TestThreadAction.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/system/TestThreadAction.java b/jena-arq/src/test/java/org/apache/jena/system/TestThreadAction.java
new file mode 100644
index 0000000..20beaae
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/system/TestThreadAction.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.system;
+
+import static org.junit.Assert.assertEquals ;
+
+import java.util.concurrent.atomic.AtomicInteger ;
+
+import org.apache.log4j.Level ;
+import org.apache.log4j.Logger ;
+import org.junit.AfterClass ;
+import org.junit.BeforeClass ;
+import org.junit.Test ;
+
+public class TestThreadAction {
+    
+    private static Logger logger = Logger.getLogger(ThreadAction.class) ; 
+    private static Level  level ;
+    
+
+    @BeforeClass
+    static public void beforeClass() {
+        level = logger.getLevel() ;
+        // ThreadAction logs warning on exceptions in before and after.
+        logger.setLevel(Level.ERROR) ;
+    }
+
+    @AfterClass
+    static public void afterClass() {
+        // Restore logging setting.
+        logger.setLevel(level) ;
+    }
+    
+    @Test public void action_01() {
+        AtomicInteger x = new AtomicInteger(0) ;
+        ThreadAction action = ThreadAction.create(()->x.incrementAndGet()) ;
+        assertEquals(0, x.get()) ;
+        action.run() ;
+        assertEquals(1, x.get()) ;
+    }
+    
+    @Test public void action_02() {
+        AtomicInteger x = new AtomicInteger(0) ;
+        ThreadAction action = ThreadAction.create(null, ()->x.incrementAndGet(), null) ;
+        assertEquals(0, x.get()) ;
+        action.run() ;
+        assertEquals(1, x.get()) ;
+    }
+    
+    @Test public void action_03() {
+        AtomicInteger before = new AtomicInteger(0) ;
+        AtomicInteger runnable = new AtomicInteger(0) ;
+        AtomicInteger after = new AtomicInteger(0) ;
+        ThreadAction action = ThreadAction.create(()->before.incrementAndGet(),
+                                                  ()->runnable.incrementAndGet(),
+                                                  ()->after.incrementAndGet()) ;
+        action.run() ;
+        assertEquals(1, before.get()) ;
+        assertEquals(1, runnable.get()) ;
+        assertEquals(1, after.get()) ;
+    }
+    
+    // Make silent.
+    @Test public void action_04() {
+        AtomicInteger before = new AtomicInteger(0) ;
+        AtomicInteger runnable = new AtomicInteger(0) ;
+        AtomicInteger after = new AtomicInteger(0) ;
+        ThreadAction action = ThreadAction.create
+            (()->{ before.incrementAndGet() ; bang() ; }, 
+             ()->runnable.incrementAndGet(),
+             ()->after.incrementAndGet()) ;
+        action.run() ;
+        assertEquals(1, before.get()) ;
+        assertEquals(0, runnable.get()) ;
+        assertEquals(0, after.get()) ;
+    }
+
+    @Test(expected=TestThreadActionException.class)
+    public void action_05() {
+        AtomicInteger before = new AtomicInteger(0) ;
+        AtomicInteger runnable = new AtomicInteger(0) ;
+        AtomicInteger after = new AtomicInteger(0) ;
+        ThreadAction action = ThreadAction.create(()->before.incrementAndGet(), 
+                                                  ()->{ runnable.incrementAndGet() ; bang() ; } ,
+                                                  ()->after.incrementAndGet()) ;
+        action.run() ;
+        assertEquals(1, before.get()) ;
+        assertEquals(1, runnable.get()) ;
+        assertEquals(0, after.get()) ;
+    }
+
+    @Test
+    public void action_06() {
+        AtomicInteger before = new AtomicInteger(0) ;
+        AtomicInteger runnable = new AtomicInteger(0) ;
+        AtomicInteger after = new AtomicInteger(0) ;
+        ThreadAction action = ThreadAction.create(()->before.incrementAndGet(), 
+                                                  ()->{ runnable.incrementAndGet() ; bang() ; } ,
+                                                  ()->after.incrementAndGet()) ;
+        try { action.run() ; }
+        catch (TestThreadActionException ex) {}
+        assertEquals(1, before.get()) ;
+        assertEquals(1, runnable.get()) ;
+        assertEquals(1, after.get()) ;
+    }
+
+    @Test
+    public void action_07() {
+        AtomicInteger before = new AtomicInteger(0) ;
+        AtomicInteger runnable = new AtomicInteger(0) ;
+        AtomicInteger after = new AtomicInteger(0) ;
+        ThreadAction action = ThreadAction.create(()->before.incrementAndGet(), 
+                                                  ()->runnable.incrementAndGet() ,
+                                                  ()->{ after.incrementAndGet(); bang() ; } ) ;
+        action.run() ;
+        assertEquals(1, before.get()) ;
+        assertEquals(1, runnable.get()) ;
+        assertEquals(1, after.get()) ;
+    }
+
+    private static void bang() { throw new TestThreadActionException() ; } 
+
+    static class TestThreadActionException extends RuntimeException {}
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/244e0ace/jena-arq/src/test/java/org/apache/jena/system/TestTxnThread.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/system/TestTxnThread.java b/jena-arq/src/test/java/org/apache/jena/system/TestTxnThread.java
index 2af8db9..d5dc58d 100644
--- a/jena-arq/src/test/java/org/apache/jena/system/TestTxnThread.java
+++ b/jena-arq/src/test/java/org/apache/jena/system/TestTxnThread.java
@@ -29,29 +29,27 @@ import org.junit.Test ;
 public class TestTxnThread {
 
     TxnCounter counter = new TxnCounter(0) ; 
-
-    
     
     // Tests for thread transactions.
     
     @Test public void txnThread_1() {
-        ThreadTxn t = ThreadTxn.threadTxnRead(counter, ()->{}) ;
+        ThreadAction t = ThreadTxn.threadTxnRead(counter, ()->{}) ;
         t.run();
     }
     
     @Test public void txnThread_2() {
-        ThreadTxn t = ThreadTxn.threadTxnWrite(counter, ()-> fail("")) ;
+        ThreadAction t = ThreadTxn.threadTxnWrite(counter, ()-> fail("")) ;
     }
 
     @Test(expected=AssertionError.class)
     public void txnThread_3() {
-        ThreadTxn t = ThreadTxn.threadTxnWrite(counter, ()-> fail("")) ;
+        ThreadAction t = ThreadTxn.threadTxnWrite(counter, ()-> fail("")) ;
         t.run() ;
     }
 
     @Test public void txnThread_10() {
         long x1 = counter.get() ;  
-        ThreadTxn t = ThreadTxn.threadTxnWrite(counter, ()->{ counter.inc() ;}) ;
+        ThreadAction t = ThreadTxn.threadTxnWrite(counter, ()->{ counter.inc() ;}) ;
         long x2 = counter.get() ;
         assertEquals("x2", x1, x2) ;
         t.run() ;
@@ -64,7 +62,7 @@ public class TestTxnThread {
         Txn.execWrite(counter, ()->{
             counter.inc();
             // Read the "before" state
-            ThreadTxn t = ThreadTxn.threadTxnRead(counter, ()->{ 
+            ThreadAction t = ThreadTxn.threadTxnRead(counter, ()->{ 
                 long z1 = counter.get() ; 
                 assertEquals("Thread read", x1, z1) ;
             }) ;
@@ -77,7 +75,7 @@ public class TestTxnThread {
 
     @Test public void txnThread_12() {
         long x1 = counter.get() ;  
-        ThreadTxn t = ThreadTxn.threadTxnRead(counter, () -> {
+        ThreadAction t = ThreadTxn.threadTxnRead(counter, () -> {
             long z1 = counter.get() ;
             assertEquals("Thread", x1, z1) ;
         }) ;