You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@isis.apache.org by da...@apache.org on 2018/09/14 16:11:43 UTC
[isis] 02/19: ISIS-1974: forward ports improvements in
ThreadPoolSupport from 'master' into 'v2'
This is an automated email from the ASF dual-hosted git repository.
danhaywood pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/isis.git
commit de6aade6c0b04e6ea53a3682fbedbe9165304844
Author: danhaywood <da...@haywood-associates.co.uk>
AuthorDate: Fri Sep 14 11:52:15 2018 +0100
ISIS-1974: forward ports improvements in ThreadPoolSupport from 'master' into 'v2'
---
.../core/runtime/threadpool/ThreadPoolSupport.java | 122 ++++++++++++++++++---
.../system/session/IsisSessionFactoryBuilder.java | 4 +-
.../wicket/viewer/IsisWicketApplication.java | 5 +-
3 files changed, 110 insertions(+), 21 deletions(-)
diff --git a/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java
index 13f220a..a9f3d54 100644
--- a/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java
+++ b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java
@@ -19,8 +19,8 @@
package org.apache.isis.core.runtime.threadpool;
-import static org.apache.isis.commons.internal.base._NullSafe.isEmpty;
-
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -32,9 +32,12 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import com.google.common.collect.Lists;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +56,9 @@ public final class ThreadPoolSupport implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolSupport.class);
+ private final static int KEEP_ALIVE_TIME_SECS = 5;
+ private final static int QUEUE_CAPACITY = 5000;
+
private final ThreadGroup group;
private final ThreadPoolExecutor concurrentExecutor;
private final ThreadPoolExecutor sequentialExecutor;
@@ -70,24 +76,26 @@ public final class ThreadPoolSupport implements AutoCloseable {
final int corePoolSize = Runtime.getRuntime().availableProcessors();
final int maximumPoolSize = Runtime.getRuntime().availableProcessors();
- final int keepAliveTimeSecs = 5;
-
+
final ThreadFactory threadFactory = (Runnable r) -> new Thread(group, r);
- final int queueCapacity = 25;
- final Supplier<BlockingQueue<Runnable>> workQueueFactory =
- ()->new LinkedBlockingQueue<>(queueCapacity);
+ final Supplier<BlockingQueue<Runnable>> workQueueFactory =
+ ()->new LinkedBlockingQueue<>(QUEUE_CAPACITY);
concurrentExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
- keepAliveTimeSecs, TimeUnit.SECONDS,
+ KEEP_ALIVE_TIME_SECS,
+ TimeUnit.SECONDS,
workQueueFactory.get(),
threadFactory);
- sequentialExecutor = new ThreadPoolExecutor(1, 1, // fixed size = 1
- keepAliveTimeSecs, TimeUnit.MILLISECONDS,
+ sequentialExecutor = new ThreadPoolExecutor(
+ 1,
+ 1,
+ KEEP_ALIVE_TIME_SECS,
+ TimeUnit.SECONDS,
workQueueFactory.get(),
threadFactory);
@@ -117,6 +125,16 @@ public final class ThreadPoolSupport implements AutoCloseable {
}
/**
+ * Executes specified {@code callables} on the default executor.
+ * See {@link ThreadPoolExecutor#invokeAll(java.util.Collection)}
+ * @param callables nullable
+ * @return non-null
+ */
+ public List<Future<Object>> invokeAll(final Callable<Object>... callables) {
+ return invokeAll(Arrays.asList(callables));
+ }
+
+ /**
* Executes specified {@code callables} on the sequential executor in sequence, one by one.
* @param callables nullable
* @return non-null
@@ -124,13 +142,23 @@ public final class ThreadPoolSupport implements AutoCloseable {
public List<Future<Object>> invokeAllSequential(@Nullable final List<Callable<Object>> callables) {
return invokeAll(sequentialExecutor, callables);
}
-
+
+ /**
+ * Executes specified {@code callables} on the sequential executor in sequence, one by one.
+ * @param callables nullable
+ * @return non-null
+ */
+ public List<Future<Object>> invokeAllSequential(final Callable<Object>... callables) {
+ return invokeAllSequential(Arrays.asList(callables));
+ }
+
+
/**
* Waits if necessary for the computation to complete. (Suppresses checked exceptions.)
* @param futures
* @return list of computation results.
*/
- public static List<Object> join(@Nullable final List<Future<Object>> futures) {
+ public List<Object> join(@Nullable final List<Future<Object>> futures) {
if (futures == null) {
return null;
}
@@ -148,31 +176,93 @@ public final class ThreadPoolSupport implements AutoCloseable {
}
}
+ public List<Object> joinGatherFailures(final List<Future<Object>> futures) {
+ if (futures == null) {
+ return null;
+ }
+
+ final long t0 = System.currentTimeMillis();
+ try{
+ final List<Object> returnValues = Lists.newArrayList();
+ for (Future<Object> future : futures) {
+ final Object result;
+ try {
+ result = future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ returnValues.add(result);
+ }
+ return returnValues;
+ } finally {
+ final long t1 = System.currentTimeMillis();
+ LOG.info("join'ing {} tasks: waited {} milliseconds ", futures.size(), (t1-t0));
+ }
+ }
+
+
/**
* Waits if necessary for the computation to complete. (Suppresses checked exceptions.)
* @param future
* @return the computation result
*/
- public static Object join(final Future<Object> future) {
+ public Object join(final Future<Object> future) {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
// ignore
+ return null;
}
- return null;
}
- // -- HELPER
+ // -- HELPERS
private List<Future<Object>> invokeAll(ThreadPoolExecutor executor, @Nullable final List<Callable<Object>> callables) {
if(isEmpty(callables)) {
return Collections.emptyList();
}
try {
- return executor.invokeAll(callables);
+ return executor.invokeAll(timed(executor, callables));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
+ private static List<Callable<Object>> timed(
+ final ThreadPoolExecutor executor,
+ final List<Callable<Object>> callables) {
+ final long queuedAt = System.currentTimeMillis();
+ return callables.stream()
+ .map(__ -> timed(__, executor.getQueue().size(), queuedAt))
+ .collect(Collectors.toList());
+ }
+
+ private static Callable<Object> timed(
+ final Callable<Object> callable,
+ final int queueSize,
+ final long queuedAt) {
+
+ return () -> {
+ final long startedAt = System.currentTimeMillis();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("START: workQueue.size: {}, waited for: {}ms, {}",
+ queueSize,
+ startedAt - queuedAt,
+ callable.toString());
+ }
+ try {
+ return callable.call();
+ } finally {
+ final long completedAt = System.currentTimeMillis();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("END: completed in: {}ms, {}",
+ completedAt - startedAt,
+ callable.toString());
+ }
+ }
+ };
+ }
+
+ private static boolean isEmpty(Collection<?> x) { return x==null || x.size() == 0; }
+
}
diff --git a/core/runtime/src/main/java/org/apache/isis/core/runtime/system/session/IsisSessionFactoryBuilder.java b/core/runtime/src/main/java/org/apache/isis/core/runtime/system/session/IsisSessionFactoryBuilder.java
index 6b95018..32bac23 100644
--- a/core/runtime/src/main/java/org/apache/isis/core/runtime/system/session/IsisSessionFactoryBuilder.java
+++ b/core/runtime/src/main/java/org/apache/isis/core/runtime/system/session/IsisSessionFactoryBuilder.java
@@ -183,7 +183,7 @@ public class IsisSessionFactoryBuilder {
// yet inject.
_Context.putSingleton(IsisSessionFactory.class, isisSessionFactory);
- final List<Callable<Object>> tasks = _Lists.<Callable<Object>>of(
+ final List<Callable<Object>> tasks = _Lists.of(
()->{
// time to initialize...
specificationLoader.init();
@@ -217,7 +217,7 @@ public class IsisSessionFactoryBuilder {
final List<Future<Object>> futures = ThreadPoolSupport.getInstance().invokeAll(tasks);
// wait on this thread for tasks to complete
- ThreadPoolSupport.join(futures);
+ ThreadPoolSupport.getInstance().join(futures);
isisSessionFactory.constructServices();
diff --git a/core/viewer-wicket-impl/src/main/java/org/apache/isis/viewer/wicket/viewer/IsisWicketApplication.java b/core/viewer-wicket-impl/src/main/java/org/apache/isis/viewer/wicket/viewer/IsisWicketApplication.java
index 7f64bcc..73b6c17 100644
--- a/core/viewer-wicket-impl/src/main/java/org/apache/isis/viewer/wicket/viewer/IsisWicketApplication.java
+++ b/core/viewer-wicket-impl/src/main/java/org/apache/isis/viewer/wicket/viewer/IsisWicketApplication.java
@@ -19,8 +19,6 @@
package org.apache.isis.viewer.wicket.viewer;
-import static java.util.Objects.requireNonNull;
-
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collections;
@@ -126,6 +124,7 @@ import de.agilecoders.wicket.core.settings.IBootstrapSettings;
import de.agilecoders.wicket.webjars.WicketWebjars;
import de.agilecoders.wicket.webjars.settings.IWebjarsSettings;
import de.agilecoders.wicket.webjars.settings.WebjarsSettings;
+import static java.util.Objects.requireNonNull;
import net.ftlines.wicketsource.WicketSource;
/**
@@ -401,7 +400,7 @@ implements ComponentFactoryRegistryAccessor, PageClassRegistryAccessor, WicketVi
LOG.error("Failed to initialize", ex);
throw ex;
} finally {
- ThreadPoolSupport.join(futures);
+ ThreadPoolSupport.getInstance().join(futures);
}
}