You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@isis.apache.org by da...@apache.org on 2018/09/14 16:11:43 UTC

[isis] 02/19: ISIS-1974: forward ports improvements in ThreadPoolSupport from 'master' into 'v2'

This is an automated email from the ASF dual-hosted git repository.

danhaywood pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/isis.git

commit de6aade6c0b04e6ea53a3682fbedbe9165304844
Author: danhaywood <da...@haywood-associates.co.uk>
AuthorDate: Fri Sep 14 11:52:15 2018 +0100

    ISIS-1974: forward ports improvements in ThreadPoolSupport from 'master' into 'v2'
---
 .../core/runtime/threadpool/ThreadPoolSupport.java | 122 ++++++++++++++++++---
 .../system/session/IsisSessionFactoryBuilder.java  |   4 +-
 .../wicket/viewer/IsisWicketApplication.java       |   5 +-
 3 files changed, 110 insertions(+), 21 deletions(-)

diff --git a/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java
index 13f220a..a9f3d54 100644
--- a/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java
+++ b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java
@@ -19,8 +19,8 @@
 
 package org.apache.isis.core.runtime.threadpool;
 
-import static org.apache.isis.commons.internal.base._NullSafe.isEmpty;
-
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -32,9 +32,12 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +56,9 @@ public final class ThreadPoolSupport implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolSupport.class);
 
+    private final static int KEEP_ALIVE_TIME_SECS = 5;
+    private final static int QUEUE_CAPACITY = 5000;
+
     private final ThreadGroup group;
     private final ThreadPoolExecutor concurrentExecutor;
     private final ThreadPoolExecutor sequentialExecutor;
@@ -70,24 +76,26 @@ public final class ThreadPoolSupport implements AutoCloseable {
 
         final int corePoolSize = Runtime.getRuntime().availableProcessors();
         final int maximumPoolSize = Runtime.getRuntime().availableProcessors();
-        final int keepAliveTimeSecs = 5;
-        
+
         final ThreadFactory threadFactory = (Runnable r) -> new Thread(group, r);
 
-        final int queueCapacity = 25;
-        final Supplier<BlockingQueue<Runnable>> workQueueFactory = 
-                ()->new LinkedBlockingQueue<>(queueCapacity);
+        final Supplier<BlockingQueue<Runnable>> workQueueFactory =
+                ()->new LinkedBlockingQueue<>(QUEUE_CAPACITY);
         
         
         concurrentExecutor = new ThreadPoolExecutor(
                 corePoolSize,
                 maximumPoolSize,
-                keepAliveTimeSecs, TimeUnit.SECONDS,
+                KEEP_ALIVE_TIME_SECS,
+                TimeUnit.SECONDS,
                 workQueueFactory.get(),
                 threadFactory);
         
-        sequentialExecutor = new ThreadPoolExecutor(1, 1, // fixed size = 1
-                keepAliveTimeSecs, TimeUnit.MILLISECONDS,
+        sequentialExecutor = new ThreadPoolExecutor(
+                1,
+                1,
+                KEEP_ALIVE_TIME_SECS,
+                TimeUnit.SECONDS,
                 workQueueFactory.get(),
                 threadFactory);
         
@@ -117,6 +125,16 @@ public final class ThreadPoolSupport implements AutoCloseable {
     }
 
     /**
+     * Executes specified {@code callables} on the default executor.
+     * See {@link ThreadPoolExecutor#invokeAll(java.util.Collection)}
+     * @param callables nullable
+     * @return non-null
+     */
+    public List<Future<Object>> invokeAll(final Callable<Object>... callables) {
+        return invokeAll(Arrays.asList(callables));
+    }
+
+    /**
      * Executes specified {@code callables} on the sequential executor in sequence, one by one.
      * @param callables nullable
      * @return non-null
@@ -124,13 +142,23 @@ public final class ThreadPoolSupport implements AutoCloseable {
     public List<Future<Object>> invokeAllSequential(@Nullable final List<Callable<Object>> callables) {
         return invokeAll(sequentialExecutor, callables);
     }
-    
+
+    /**
+     * Executes specified {@code callables} on the sequential executor in sequence, one by one.
+     * @param callables nullable
+     * @return non-null
+     */
+    public List<Future<Object>> invokeAllSequential(final Callable<Object>... callables) {
+        return invokeAllSequential(Arrays.asList(callables));
+    }
+
+
     /**
      * Waits if necessary for the computation to complete. (Suppresses checked exceptions.)
      * @param futures
      * @return list of computation results.
      */
-    public static List<Object> join(@Nullable final List<Future<Object>> futures) {
+    public List<Object> join(@Nullable final List<Future<Object>> futures) {
         if (futures == null) {
             return null;
         }
@@ -148,31 +176,93 @@ public final class ThreadPoolSupport implements AutoCloseable {
         }
     }
 
+    public List<Object> joinGatherFailures(final List<Future<Object>> futures) {
+        if (futures == null) {
+            return null;
+        }
+
+        final long t0 = System.currentTimeMillis();
+        try{
+            final List<Object> returnValues = Lists.newArrayList();
+            for (Future<Object> future : futures) {
+                final Object result;
+                try {
+                    result = future.get();
+                } catch (InterruptedException | ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+                returnValues.add(result);
+            }
+            return returnValues;
+        } finally {
+            final long t1 = System.currentTimeMillis();
+            LOG.info("join'ing {} tasks: waited {} milliseconds ", futures.size(), (t1-t0));
+        }
+    }
+
+
     /**
      * Waits if necessary for the computation to complete. (Suppresses checked exceptions.)
      * @param future
      * @return the computation result
      */
-    public static Object join(final Future<Object> future) {
+    public Object join(final Future<Object> future) {
         try {
             return future.get();
         } catch (InterruptedException | ExecutionException e) {
             // ignore
+            return null;
         }
-        return null;
     }
 
-    // -- HELPER
+    // -- HELPERS
     
     private List<Future<Object>> invokeAll(ThreadPoolExecutor executor, @Nullable final List<Callable<Object>> callables) {
         if(isEmpty(callables)) {
             return Collections.emptyList();
         }
         try {
-            return executor.invokeAll(callables);
+            return executor.invokeAll(timed(executor, callables));
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }
     }
 
+    private static List<Callable<Object>> timed(
+            final ThreadPoolExecutor executor,
+            final List<Callable<Object>> callables) {
+        final long queuedAt = System.currentTimeMillis();
+        return callables.stream()
+                .map(__ -> timed(__, executor.getQueue().size(), queuedAt))
+                .collect(Collectors.toList());
+    }
+
+    private static Callable<Object> timed(
+            final Callable<Object> callable,
+            final int queueSize,
+            final long queuedAt) {
+
+        return () -> {
+            final long startedAt = System.currentTimeMillis();
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("START: workQueue.size: {}, waited for: {}ms, {}",
+                        queueSize,
+                        startedAt - queuedAt,
+                        callable.toString());
+            }
+            try {
+                return callable.call();
+            } finally {
+                final long completedAt = System.currentTimeMillis();
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("END: completed in: {}ms, {}",
+                            completedAt - startedAt,
+                            callable.toString());
+                }
+            }
+        };
+    }
+
+    private static boolean isEmpty(Collection<?> x) { return x==null || x.size() == 0; }
+
 }
diff --git a/core/runtime/src/main/java/org/apache/isis/core/runtime/system/session/IsisSessionFactoryBuilder.java b/core/runtime/src/main/java/org/apache/isis/core/runtime/system/session/IsisSessionFactoryBuilder.java
index 6b95018..32bac23 100644
--- a/core/runtime/src/main/java/org/apache/isis/core/runtime/system/session/IsisSessionFactoryBuilder.java
+++ b/core/runtime/src/main/java/org/apache/isis/core/runtime/system/session/IsisSessionFactoryBuilder.java
@@ -183,7 +183,7 @@ public class IsisSessionFactoryBuilder {
             // yet inject.
             _Context.putSingleton(IsisSessionFactory.class, isisSessionFactory);
 
-            final List<Callable<Object>> tasks = _Lists.<Callable<Object>>of(
+            final List<Callable<Object>> tasks = _Lists.of(
                     ()->{
                         // time to initialize...
                         specificationLoader.init();
@@ -217,7 +217,7 @@ public class IsisSessionFactoryBuilder {
             final List<Future<Object>> futures = ThreadPoolSupport.getInstance().invokeAll(tasks);
             
             // wait on this thread for tasks to complete
-            ThreadPoolSupport.join(futures); 
+            ThreadPoolSupport.getInstance().join(futures);
 
             isisSessionFactory.constructServices();
 
diff --git a/core/viewer-wicket-impl/src/main/java/org/apache/isis/viewer/wicket/viewer/IsisWicketApplication.java b/core/viewer-wicket-impl/src/main/java/org/apache/isis/viewer/wicket/viewer/IsisWicketApplication.java
index 7f64bcc..73b6c17 100644
--- a/core/viewer-wicket-impl/src/main/java/org/apache/isis/viewer/wicket/viewer/IsisWicketApplication.java
+++ b/core/viewer-wicket-impl/src/main/java/org/apache/isis/viewer/wicket/viewer/IsisWicketApplication.java
@@ -19,8 +19,6 @@
 
 package org.apache.isis.viewer.wicket.viewer;
 
-import static java.util.Objects.requireNonNull;
-
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.Collections;
@@ -126,6 +124,7 @@ import de.agilecoders.wicket.core.settings.IBootstrapSettings;
 import de.agilecoders.wicket.webjars.WicketWebjars;
 import de.agilecoders.wicket.webjars.settings.IWebjarsSettings;
 import de.agilecoders.wicket.webjars.settings.WebjarsSettings;
+import static java.util.Objects.requireNonNull;
 import net.ftlines.wicketsource.WicketSource;
 
 /**
@@ -401,7 +400,7 @@ implements ComponentFactoryRegistryAccessor, PageClassRegistryAccessor, WicketVi
             LOG.error("Failed to initialize", ex);
             throw ex;
         } finally {
-            ThreadPoolSupport.join(futures);
+            ThreadPoolSupport.getInstance().join(futures);
         }
     }