You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@isis.apache.org by ah...@apache.org on 2018/09/21 09:11:18 UTC

[isis] branch v2 updated: ISIS-1974: adds tests for ThreadPoolSupport's sequential execution

This is an automated email from the ASF dual-hosted git repository.

ahuber pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/isis.git


The following commit(s) were added to refs/heads/v2 by this push:
     new e4d6dbf  ISIS-1974: adds tests for ThreadPoolSupport's sequential execution
e4d6dbf is described below

commit e4d6dbf3d7505ee400053ed9cb5a819f548fdc07
Author: Andi Huber <ah...@apache.org>
AuthorDate: Fri Sep 21 11:11:11 2018 +0200

    ISIS-1974: adds tests for ThreadPoolSupport's sequential execution
    
    Task-Url: https://issues.apache.org/jira/browse/ISIS-1974
---
 .../FutureWithIndexIntoFutureOfList.java           |  62 ++++++++++++
 .../core/runtime/threadpool/ThreadPoolSupport.java |  58 +++++++----
 .../runtime/threadpool/ThreadPoolSupportTest.java  | 108 +++++++++++++++++++++
 3 files changed, 208 insertions(+), 20 deletions(-)

diff --git a/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/FutureWithIndexIntoFutureOfList.java b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/FutureWithIndexIntoFutureOfList.java
new file mode 100644
index 0000000..d6ca094
--- /dev/null
+++ b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/FutureWithIndexIntoFutureOfList.java
@@ -0,0 +1,62 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.isis.core.runtime.threadpool;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+final class FutureWithIndexIntoFutureOfList<T> implements Future<T> {
+    final Future<List<T>> commonFuture;
+    final int index;
+    
+    FutureWithIndexIntoFutureOfList(Future<List<T>> commonFuture, int index) {
+        this.commonFuture = commonFuture;
+        this.index = index;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return commonFuture.cancel(mayInterruptIfRunning);
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return commonFuture.isCancelled();
+    }
+
+    @Override
+    public boolean isDone() {
+        return commonFuture.isDone();
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        return commonFuture.get().get(index);
+    }
+
+    @Override
+    public T get(long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return commonFuture.get(timeout, unit).get(index);
+    }
+}
\ No newline at end of file
diff --git a/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java
index e900c74..e6d30c7 100644
--- a/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java
+++ b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java
@@ -19,6 +19,11 @@
 
 package org.apache.isis.core.runtime.threadpool;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
+import static org.apache.isis.commons.internal.base._Casts.uncheckedCast;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -33,12 +38,14 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import javax.annotation.Nullable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.isis.commons.internal.base._NullSafe;
 import org.apache.isis.commons.internal.collections._Lists;
 import org.apache.isis.commons.internal.context._Context;
 
@@ -59,7 +66,6 @@ public final class ThreadPoolSupport implements AutoCloseable {
 
     private final ThreadGroup group;
     private final ThreadPoolExecutor concurrentExecutor;
-    private final ThreadPoolExecutor sequentialExecutor;
 
     /**
      * @return the application-scoped singleton ThreadPoolSupport instance
@@ -68,7 +74,7 @@ public final class ThreadPoolSupport implements AutoCloseable {
         return _Context.computeIfAbsent(ThreadPoolSupport.class, __-> new ThreadPoolSupport());
     }
     
-    private ThreadPoolSupport() {
+    ThreadPoolSupport() {
 
         group = new ThreadGroup(ThreadPoolSupport.class.getName());
         
@@ -87,14 +93,6 @@ public final class ThreadPoolSupport implements AutoCloseable {
                 workQueueFactory.get(),
                 threadFactory);
         
-        sequentialExecutor = new ThreadPoolExecutor(
-                1,
-                1,
-                KEEP_ALIVE_TIME_SECS,
-                TimeUnit.SECONDS,
-                workQueueFactory.get(),
-                threadFactory);
-        
     }
     
     /*
@@ -102,12 +100,7 @@ public final class ThreadPoolSupport implements AutoCloseable {
      */
     @Override
     public void close() throws Exception {
-        try {
-            concurrentExecutor.shutdown();
-        } finally {
-            // in case the previous throws, continue execution here
-            sequentialExecutor.shutdown();            
-        }
+        concurrentExecutor.shutdown();
     }
 
     /**
@@ -132,16 +125,26 @@ public final class ThreadPoolSupport implements AutoCloseable {
     }
 
     /**
-     * Executes specified {@code callables} on the sequential executor in sequence, one by one.
+     * Wraps specified {@code callables} into a single task, which is then executed on the default executor.    
+     * The single task itself executes specified {@code callables} in sequence, one by one.
      * @param callables nullable
      * @return non-null
      */
     public List<Future<Object>> invokeAllSequential(@Nullable final List<Callable<Object>> callables) {
-        return invokeAll(sequentialExecutor, callables);
+        if(_NullSafe.isEmpty(callables)) {
+            return emptyList();
+        }
+        final Future<List<Object>> commonFuture = 
+                uncheckedCast(invokeAll(singletonList(toSingleTask(callables))).get(0));
+
+        return IntStream.range(0, callables.size())
+        .mapToObj(index->new FutureWithIndexIntoFutureOfList<Object>(commonFuture, index))
+        .collect(toList());
     }
 
     /**
-     * Executes specified {@code callables} on the sequential executor in sequence, one by one.
+     * Wraps specified {@code callables} into a single task, which is then executed on the default executor.    
+     * The single task itself executes specified {@code callables} in sequence, one by one.
      * @param callables nullable
      * @return non-null
      */
@@ -212,6 +215,11 @@ public final class ThreadPoolSupport implements AutoCloseable {
             return null;
         }
     }
+    
+    @Override
+    public String toString() {
+        return concurrentExecutor.toString();
+    }
 
     // -- HELPERS
     
@@ -231,7 +239,7 @@ public final class ThreadPoolSupport implements AutoCloseable {
             final List<Callable<Object>> callables) {
         final long queuedAt = System.currentTimeMillis();
         return callables.stream()
-                .map(__ -> timed(__, executor.getQueue().size(), queuedAt))
+                .map(callable -> timed(callable, executor.getQueue().size(), queuedAt))
                 .collect(Collectors.toList());
     }
 
@@ -261,6 +269,16 @@ public final class ThreadPoolSupport implements AutoCloseable {
         };
     }
 
+    private Callable<Object> toSingleTask(final List<Callable<Object>> callables) {
+        return () -> {
+            final List<Object> resultList = _Lists.newArrayList();
+            for(Callable<Object> callable : callables) {
+                resultList.add(callable.call()); // any exceptions thrown are propagated
+            }
+            return resultList;
+        };
+    }
+    
     private static boolean isEmpty(Collection<?> x) { return x==null || x.size() == 0; }
 
 }
diff --git a/core/metamodel/src/test/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupportTest.java b/core/metamodel/src/test/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupportTest.java
new file mode 100644
index 0000000..655f572
--- /dev/null
+++ b/core/metamodel/src/test/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupportTest.java
@@ -0,0 +1,108 @@
+package org.apache.isis.core.runtime.threadpool;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.isis.commons.internal.collections._Lists;
+
+class ThreadPoolSupportTest {
+
+    private ThreadPoolSupport pool;
+    private Buffer buffer;
+    
+    @BeforeEach
+    void setUp() throws Exception {
+        pool = new ThreadPoolSupport();
+        buffer = new Buffer();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        pool.close();
+    }
+    
+    @Test
+    void shouldPreserveSequence_whenNoThread() {
+        
+        System.out.println(""+pool);
+        
+        buffer.append("A");
+        buffer.append("B");
+        buffer.append("C");
+        Assertions.assertEquals("ABC", buffer.toString());
+    }
+    
+    @Test
+    void shouldPreserveSequence_whenSequentialExecution() {
+        
+        final List<Future<Object>> futures = _Lists.newArrayList();
+        
+        final List<Callable<Object>> tasks = _Lists.of(
+                ()->{buffer.append("A"); return "A";},
+                ()->{buffer.append("B"); return "B";},
+                ()->{buffer.append("C"); return "C";},
+                ()->{buffer.append("D"); return "D";},
+                ()->{buffer.append("E"); return "E";},
+                ()->{buffer.append("F"); return "F";},
+                ()->{buffer.append("G"); return "G";},
+                ()->{buffer.append("H"); return "H";},
+                ()->{buffer.append("I"); return "I";},
+                ()->{buffer.append("J"); return "J";}
+        );
+        
+        for(int i=0; i<256; ++i) {
+            final List<Future<Object>> taskFutures = pool.invokeAllSequential(tasks);
+            pool.joinGatherFailures(taskFutures);
+            futures.addAll(taskFutures);
+        }
+        
+        final String resultString = futures.stream()
+        .map(future->{try {
+            return future.get();
+        } catch (InterruptedException | ExecutionException e) {
+            return ""+e;
+        }})
+        .map(result->""+result)
+        .collect(Collectors.joining());
+        
+        final String buffer_allowedSequencesRemoved = buffer.toString().replace("ABCDEFGHIJ", "");
+        final String result_allowedSequencesRemoved = resultString.replace("ABCDEFGHIJ", "");
+        
+        
+        Assertions.assertEquals("", buffer_allowedSequencesRemoved);
+        Assertions.assertEquals("", result_allowedSequencesRemoved);
+        
+    }
+    
+    // -- HELPER
+    
+    /** thread-safe StringBuffer */
+    private static class Buffer {
+        
+        final StringBuffer sb = new StringBuffer();
+
+        public Buffer append(final CharSequence c) {
+            synchronized (sb) {
+                sb.append(c);    
+            }
+            return this;
+        }
+        
+        @Override
+        public String toString() {
+            synchronized (sb) {
+                return sb.toString();    
+            }
+        }
+        
+    }
+
+}