You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@isis.apache.org by ah...@apache.org on 2019/09/17 16:23:26 UTC
[isis] branch v2 updated: ISIS-2158 deprecats ThreadPoolSupport in
favor of new ConcurrentTaskList
This is an automated email from the ASF dual-hosted git repository.
ahuber pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/isis.git
The following commit(s) were added to refs/heads/v2 by this push:
new 8296ebd ISIS-2158 deprecats ThreadPoolSupport in favor of new ConcurrentTaskList
8296ebd is described below
commit 8296ebd9391847de724daa56d5aaea49bf1ad343
Author: Andi Huber <ah...@apache.org>
AuthorDate: Tue Sep 17 18:23:05 2019 +0200
ISIS-2158 deprecats ThreadPoolSupport in favor of new ConcurrentTaskList
- ConcurrentTaskList (internal API) hopefully provides a simpler and
more flexible solution to concurrent task execution
---
.../isis/applib/mixins/dto/Dto_downloadXsd.java | 2 +-
.../isis/commons/{ => compression}/ZipWriter.java | 2 +-
.../isis/commons/concurrent/AwaitableLatch.java} | 28 ++-
.../internal/concurrent/ConcurrentContext.java | 55 ++++++
.../internal/concurrent/ConcurrentTask.java | 188 ++++++++++++++++++++
.../internal/concurrent/ConcurrentTaskList.java | 187 ++++++++++++++++++++
.../isis/commons/internal/concurrent/_Tasks.java | 190 ---------------------
.../FutureWithIndexIntoFutureOfList.java | 1 +
.../threadpool/ThreadPoolExecutionMode.java | 3 +-
.../internal/threadpool/ThreadPoolSizeAdvisor.java | 1 +
.../internal/threadpool/ThreadPoolSupport.java | 4 +-
.../services/layout/LayoutServiceDefault.java | 2 +-
.../services/registry/ServiceRegistryDefault.java | 7 -
.../specloader/SpecificationLoaderDefault.java | 63 ++-----
.../services/sse/EventStreamServiceDefault.java | 20 +--
.../system/session/IsisSessionFactoryDefault.java | 20 +--
.../transaction/IsisTransactionAspectSupport.java | 7 +-
.../wicket/viewer/IsisWicketApplication.java | 36 ++--
18 files changed, 516 insertions(+), 300 deletions(-)
diff --git a/core/applib/src/main/java/org/apache/isis/applib/mixins/dto/Dto_downloadXsd.java b/core/applib/src/main/java/org/apache/isis/applib/mixins/dto/Dto_downloadXsd.java
index 974b93c..aa70c38 100644
--- a/core/applib/src/main/java/org/apache/isis/applib/mixins/dto/Dto_downloadXsd.java
+++ b/core/applib/src/main/java/org/apache/isis/applib/mixins/dto/Dto_downloadXsd.java
@@ -34,7 +34,7 @@ import org.apache.isis.applib.mixins.MixinConstants;
import org.apache.isis.applib.services.jaxb.JaxbService;
import org.apache.isis.applib.services.message.MessageService;
import org.apache.isis.applib.value.BlobClobFactory;
-import org.apache.isis.commons.ZipWriter;
+import org.apache.isis.commons.compression.ZipWriter;
import lombok.RequiredArgsConstructor;
import lombok.val;
diff --git a/core/commons/src/main/java/org/apache/isis/commons/ZipWriter.java b/core/commons/src/main/java/org/apache/isis/commons/compression/ZipWriter.java
similarity index 98%
rename from core/commons/src/main/java/org/apache/isis/commons/ZipWriter.java
rename to core/commons/src/main/java/org/apache/isis/commons/compression/ZipWriter.java
index 13a3d34..595af5a 100644
--- a/core/commons/src/main/java/org/apache/isis/commons/ZipWriter.java
+++ b/core/commons/src/main/java/org/apache/isis/commons/compression/ZipWriter.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.isis.commons;
+package org.apache.isis.commons.compression;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
diff --git a/core/runtime/src/main/java/org/apache/isis/runtime/system/transaction/TransactionLatch.java b/core/commons/src/main/java/org/apache/isis/commons/concurrent/AwaitableLatch.java
similarity index 65%
rename from core/runtime/src/main/java/org/apache/isis/runtime/system/transaction/TransactionLatch.java
rename to core/commons/src/main/java/org/apache/isis/commons/concurrent/AwaitableLatch.java
index a0ad18e..56b1455 100644
--- a/core/runtime/src/main/java/org/apache/isis/runtime/system/transaction/TransactionLatch.java
+++ b/core/commons/src/main/java/org/apache/isis/commons/concurrent/AwaitableLatch.java
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.isis.runtime.system.transaction;
+package org.apache.isis.commons.concurrent;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.isis.commons.internal.exceptions._Exceptions;
+
import lombok.RequiredArgsConstructor;
/**
@@ -29,28 +31,36 @@ import lombok.RequiredArgsConstructor;
*
*/
@RequiredArgsConstructor(staticName = "of")
-public final class TransactionLatch {
+public final class AwaitableLatch {
private final CountDownLatch countDownLatch;
- public static TransactionLatch unlocked() {
+ public static AwaitableLatch unlocked() {
return of(new CountDownLatch(0));
}
/**
- * {@link CountDownLatch#await()}
+ * {@link AwaitableLatch#await()}
* @throws InterruptedException
*/
- public void await() throws InterruptedException {
- countDownLatch.await();
+ public void await() {
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ throw _Exceptions.unrecoverable(e);
+ }
}
/**
- * {@link CountDownLatch#await(long, TimeUnit)}
+ * {@link AwaitableLatch#await(long, TimeUnit)}
* @throws InterruptedException
*/
- public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
- return countDownLatch.await(timeout, unit);
+ public boolean await(long timeout, TimeUnit unit) {
+ try {
+ return countDownLatch.await(timeout, unit);
+ } catch (InterruptedException e) {
+ throw _Exceptions.unrecoverable(e);
+ }
}
}
diff --git a/core/commons/src/main/java/org/apache/isis/commons/internal/concurrent/ConcurrentContext.java b/core/commons/src/main/java/org/apache/isis/commons/internal/concurrent/ConcurrentContext.java
new file mode 100644
index 0000000..028b7e1
--- /dev/null
+++ b/core/commons/src/main/java/org/apache/isis/commons/internal/concurrent/ConcurrentContext.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.isis.commons.internal.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+
+import lombok.Builder;
+
+/**
+ * <h1>- internal use only -</h1>
+ *
+ * <p>
+ * <b>WARNING</b>: Do <b>NOT</b> use any of the classes provided by this package! <br/>
+ * These may be changed or removed without notice!
+ * </p>
+ *
+ * @since 2.0
+ */
+@Builder
+public class ConcurrentContext {
+
+ @Builder.Default final ExecutorService executorService = null;
+ @Builder.Default final boolean enableExecutionLogging = true;
+
+ public static ConcurrentContextBuilder forkJoin() {
+ return ConcurrentContext.builder()
+ .executorService(ForkJoinPool.commonPool());
+ }
+
+ public static ConcurrentContextBuilder sequential() {
+ return ConcurrentContext.builder();
+ }
+
+ public boolean shouldRunSequential() {
+ return executorService == null;
+ }
+
+}
diff --git a/core/commons/src/main/java/org/apache/isis/commons/internal/concurrent/ConcurrentTask.java b/core/commons/src/main/java/org/apache/isis/commons/internal/concurrent/ConcurrentTask.java
new file mode 100644
index 0000000..410994e
--- /dev/null
+++ b/core/commons/src/main/java/org/apache/isis/commons/internal/concurrent/ConcurrentTask.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.isis.commons.internal.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+import static org.apache.isis.commons.internal.base._With.requires;
+
+import lombok.Getter;
+import lombok.val;
+
+/**
+ * <h1>- internal use only -</h1>
+ *
+ * <p>
+ * <b>WARNING</b>: Do <b>NOT</b> use any of the classes provided by this package! <br/>
+ * These may be changed or removed without notice!
+ * </p>
+ *
+ * @since 2.0
+ */
+public abstract class ConcurrentTask<T> implements Runnable {
+
+ public static enum State {
+ NOT_STARTED,
+ STARTED,
+ FAILED,
+ COMPLETED
+ }
+
+ public abstract String getName();
+
+ @Getter private State status = State.NOT_STARTED;
+ @Getter private long startedAtNanos;
+ @Getter private long completedAtNanos;
+ @Getter private long failedAtNanos;
+ @Getter private T completedWith;
+ @Getter private Exception failedWith;
+
+ protected synchronized void preCall() {
+ if(startedAtNanos>0L) {
+ val msg = String.format(
+ "Cannot start task '%s' again, was already started before",
+ getName());
+ throw new IllegalStateException(msg);
+ }
+ startedAtNanos = System.nanoTime();
+ status = State.STARTED;
+ }
+
+ protected void postCall(T completedWith, Exception failedWith) {
+ if(failedWith!=null) {
+ this.failedAtNanos = System.nanoTime();
+ this.failedWith = failedWith;
+ this.status = State.FAILED;
+ } else {
+ this.completedAtNanos = System.nanoTime();
+ this.completedWith = completedWith;
+ this.status = State.COMPLETED;
+ }
+ }
+
+ abstract T innerCall() throws Exception;
+
+ @Override
+ public final void run() {
+
+ preCall();
+ try {
+ val completedWith = innerCall();
+ postCall(completedWith, /*failedWith*/ null);
+ } catch (Exception e) {
+ postCall(/*completedWith*/ null, e);
+ }
+
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ // -- NAMING
+
+ public ConcurrentTask<T> withName(String name) {
+
+ requires(name, "name");
+
+ val delegate = this;
+
+ return new ConcurrentTask<T>() {
+
+ @Override
+ public T innerCall() throws Exception {
+ return delegate.innerCall();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ };
+
+ }
+
+ public ConcurrentTask<T> withName(Supplier<String> nameSupplier) {
+
+ requires(nameSupplier, "nameSupplier");
+
+ val delegate = this;
+
+ return new ConcurrentTask<T>() {
+
+ @Override
+ public T innerCall() throws Exception {
+ return delegate.innerCall();
+ }
+
+ @Override
+ public String getName() {
+ return nameSupplier.get();
+ }
+
+ };
+
+ }
+
+
+ // -- FACTORIES
+
+ public static ConcurrentTask<Void> of(Runnable runnable) {
+
+ requires(runnable, "runnable");
+
+ return new ConcurrentTask<Void>() {
+
+ @Override
+ public Void innerCall() throws Exception {
+ runnable.run();
+ return null;
+ }
+
+ @Override
+ public String getName() {
+ return runnable.toString();
+ }
+
+ };
+ }
+
+ public static <X> ConcurrentTask<X> of(Callable<X> callable) {
+
+ requires(callable, "callable");
+
+ return new ConcurrentTask<X>() {
+
+ @Override
+ public X innerCall() throws Exception {
+ return callable.call();
+ }
+
+ @Override
+ public String getName() {
+ return callable.toString();
+ }
+
+ };
+ }
+
+}
diff --git a/core/commons/src/main/java/org/apache/isis/commons/internal/concurrent/ConcurrentTaskList.java b/core/commons/src/main/java/org/apache/isis/commons/internal/concurrent/ConcurrentTaskList.java
new file mode 100644
index 0000000..613b312
--- /dev/null
+++ b/core/commons/src/main/java/org/apache/isis/commons/internal/concurrent/ConcurrentTaskList.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.isis.commons.internal.concurrent;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.logging.log4j.Level;
+
+import org.apache.isis.commons.concurrent.AwaitableLatch;
+import org.apache.isis.commons.internal.collections._Lists;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.val;
+import lombok.extern.log4j.Log4j2;
+
+@RequiredArgsConstructor(staticName = "named")
+@Log4j2
+public class ConcurrentTaskList {
+
+ @Getter private final String name;
+
+ private final List<ConcurrentTask<?>> tasks = _Lists.newArrayList();
+ private final AtomicBoolean wasStarted = new AtomicBoolean();
+ private final CountDownLatch countDownLatch = new CountDownLatch(1);
+ private final AwaitableLatch awaitableLatch = AwaitableLatch.of(countDownLatch);
+ private final LongAdder tasksExecuted = new LongAdder();
+ private long executionTimeNanos;
+
+ public List<ConcurrentTask<?>> getTasks() {
+ return Collections.unmodifiableList(tasks);
+ }
+
+ // -- ASSEMBLING
+
+ public ConcurrentTaskList addTask(ConcurrentTask<?> task) {
+ synchronized (tasks) {
+ if(wasStarted.get()) {
+ val msg = "Tasks already started execution, can no longer modify collection of tasks!";
+ throw new IllegalStateException(msg);
+ }
+ tasks.add(task);
+ }
+ return this;
+ }
+
+ public ConcurrentTaskList addTasks(Collection<? extends ConcurrentTask<?>> tasks) {
+ synchronized (this.tasks) {
+ if(wasStarted.get()) {
+ val msg = "Tasks already started execution, can no longer modify collection of tasks!";
+ throw new IllegalStateException(msg);
+ }
+ this.tasks.addAll(tasks);
+ }
+ return this;
+ }
+
+ // -- EXECUTION
+
+ public ConcurrentTaskList submit(ConcurrentContext context) {
+
+ synchronized (tasks) {
+ if(wasStarted.get()) {
+ val msg = "Tasks already started execution, can not start again!";
+ throw new IllegalStateException(msg);
+ }
+ wasStarted.set(true);
+ }
+
+ val t0 = System.nanoTime();
+
+ if(context.shouldRunSequential()) {
+ for(ConcurrentTask<?> task : tasks) {
+ task.run();
+ tasksExecuted.increment();
+ }
+ countDownLatch.countDown();
+ executionTimeNanos = System.nanoTime() - t0;
+ logExecutionSummary(context);
+ return this;
+ }
+
+ val futures = new ArrayList<Future<?>>(tasks.size());
+
+ for(ConcurrentTask<?> task : tasks) {
+ futures.add(context.executorService.submit(task));
+ }
+
+ // now wait for all futures to complete on a separate thread
+
+ val thread = new Thread() {
+
+ @Override
+ public void run() {
+ for(Future<?> future : futures) {
+ try {
+ future.get();
+ tasksExecuted.increment();
+ } catch (InterruptedException | ExecutionException e) {
+ // ignore, continue waiting on tasks
+ }
+
+ }
+ countDownLatch.countDown();
+ executionTimeNanos = System.nanoTime() - t0;
+ logExecutionSummary(context);
+ }
+
+ };
+
+ thread.start();
+
+ return this;
+
+ }
+
+ // -- SYNCHRONICATION
+
+ public AwaitableLatch latch() {
+ return awaitableLatch;
+ }
+
+ public void await() {
+ latch().await();
+ }
+
+ // -- FIELDS/GETTERS
+
+ public Duration getExecutionTime() {
+ return Duration.of(executionTimeNanos, ChronoUnit.NANOS);
+ }
+
+ // -- EXECUTION LOGGING
+
+ private void logExecutionSummary(ConcurrentContext context) {
+ if(!context.enableExecutionLogging) {
+ return;
+ }
+
+ log.printf(Level.INFO,
+ "TaskList[%s] running %d/%d tasks %s, took %.3f milliseconds ",
+ getName(),
+ tasksExecuted.longValue(),
+ tasks.size(),
+ context.shouldRunSequential() ? "sequential" : "concurrent",
+ 0.000_001 * executionTimeNanos);
+
+ }
+
+ // -- SHORTCUTS
+
+ public ConcurrentTaskList addRunnable(String name, Runnable runnable) {
+ return addTask(ConcurrentTask.of(runnable).withName(name));
+ }
+
+ public ConcurrentTaskList submit(ConcurrentContext.ConcurrentContextBuilder contextBuilder) {
+ return submit(contextBuilder.build());
+ }
+
+
+}
diff --git a/core/commons/src/main/java/org/apache/isis/commons/internal/concurrent/_Tasks.java b/core/commons/src/main/java/org/apache/isis/commons/internal/concurrent/_Tasks.java
deleted file mode 100644
index 841ece8..0000000
--- a/core/commons/src/main/java/org/apache/isis/commons/internal/concurrent/_Tasks.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.isis.commons.internal.concurrent;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.function.Supplier;
-
-import javax.annotation.Nullable;
-
-import org.apache.logging.log4j.Level;
-
-import org.apache.isis.commons.internal.base._Either;
-import org.apache.isis.commons.internal.collections._Lists;
-import org.apache.isis.commons.internal.exceptions._Exceptions;
-
-import static org.apache.isis.commons.internal.base._With.mapIfPresentElse;
-import static org.apache.isis.commons.internal.base._With.requires;
-
-import lombok.RequiredArgsConstructor;
-import lombok.val;
-import lombok.extern.log4j.Log4j2;
-
-/**
- * <h1>- internal use only -</h1>
- * <p>
- * Framework internal concurrency support.
- * <p>
- * <b>WARNING</b>: Do <b>NOT</b> use any of the classes provided by this package! <br/>
- * These may be changed or removed without notice!
- *
- * @since 2.0
- */
-@Log4j2
-public final class _Tasks {
-
- public static _Tasks create() {
- return new _Tasks();
- }
-
- public void addRunnable(Runnable runnable) {
- requires(runnable, "runnable");
- addRunnable(runnable, null);
- }
-
- public void addRunnable(Runnable runnable, @Nullable Supplier<String> name) {
- requires(runnable, "runnable");
- callables.add(new NamedCallable<Object>(name) {
-
- @Override
- public Void call() throws Exception {
- runnable.run();
- return null;
- }
-
- });
- }
-
- public void addRunnable(String name, Runnable runnable) {
- requires(runnable, "runnable");
- addRunnable(runnable, ()->name);
- }
-
- public List<Callable<Object>> getCallables() {
- return Collections.unmodifiableList(callables);
- }
-
- public void invokeAndWait(boolean concurrent) {
-
- val t0 = System.nanoTime();
- val tasksExecuted = new LongAdder();
-
- try {
-
- if(concurrent) {
-
- // val forkJoinPool = new ForkJoinPool();
- // val anyErrorRef = new AtomicReference<RuntimeException>();
- //
- // forkJoinPool.submit(()->{
- val anyError = callables.parallelStream()
- .map(_Tasks::call)
- .peek(__->tasksExecuted.increment())
- .filter(_Either::isRight)
- .findAny()
- .map(_Either::rightIfAny)
- .orElse(null);
-
- // anyErrorRef.set(anyError);
- // });
- //
- // forkJoinPool.shutdown();
- //
- // try {
- // System.err.println("wait for ForkJoinPool " + forkJoinPool);
- // forkJoinPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
- // System.err.println("done waiting for ForkJoinPool " + forkJoinPool);
- // } catch (InterruptedException e) {
- // throw _Exceptions.unrecoverable("exception while waiting on the ForkJoinPool to terminate", e);
- // }
- //
- // val anyError = anyErrorRef.get();
- if(anyError!=null) {
- throw anyError;
- }
-
- } else {
-
- for(Callable<?> callable : getCallables()) {
- val eitherResultOrError = call(callable);
- tasksExecuted.increment();
-
- if(eitherResultOrError.isRight()) {
- throw eitherResultOrError.rightIfAny();
- }
- }
-
- }
-
- } finally {
-
- if(log.isDebugEnabled()) {
- val t1 = System.nanoTime();
- log.printf(Level.DEBUG,
- "running %d/%d tasks %s, took %.3f milliseconds ",
- tasksExecuted.longValue(),
- callables.size(),
- concurrent ? "concurrent" : "sequential",
- 0.000_001 * (t1-t0));
- }
-
- callables.clear();
- }
-
- }
-
- // -- IMPLEMENTATION DETAILS
-
- private static <T> _Either<T, RuntimeException> call(Callable<T> callable) {
-
- try {
- val result = callable.call();
- return _Either.leftNullable(result);
- } catch (Throwable cause) {
-
- val name = callable instanceof NamedCallable
- ? callable.toString()
- : "unnamend";
-
- val msg = String.format("failure while executing callable '%s'", name);
- log.error(msg, cause);
- return _Either.right(_Exceptions.unrecoverable(msg, cause));
- }
-
- }
-
- private final List<Callable<Object>> callables = _Lists.newArrayList();
-
- @RequiredArgsConstructor
- private abstract static class NamedCallable<T> implements Callable<T> {
-
- private final Supplier<String> name;
-
- @Override
- public String toString() {
- return mapIfPresentElse(name, Supplier::get, super.toString());
- }
-
- }
-
-
-}
diff --git a/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/FutureWithIndexIntoFutureOfList.java b/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/FutureWithIndexIntoFutureOfList.java
index 54cda52..584e31b 100644
--- a/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/FutureWithIndexIntoFutureOfList.java
+++ b/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/FutureWithIndexIntoFutureOfList.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+@Deprecated
final class FutureWithIndexIntoFutureOfList<T> implements Future<T> {
final Future<List<T>> commonFuture;
final int index;
diff --git a/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/ThreadPoolExecutionMode.java b/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/ThreadPoolExecutionMode.java
index 2cee9b5..442eb0e 100644
--- a/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/ThreadPoolExecutionMode.java
+++ b/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/ThreadPoolExecutionMode.java
@@ -21,7 +21,8 @@ package org.apache.isis.commons.internal.threadpool;
/**
* ThreadPollSupport's executions mode where the enum's ordinal corresponds to the level of concurrency.
*/
-public enum ThreadPoolExecutionMode {
+@Deprecated
+enum ThreadPoolExecutionMode {
/**
* Wraps submitted tasks into a single task, which is then executed within the context
diff --git a/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/ThreadPoolSizeAdvisor.java b/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/ThreadPoolSizeAdvisor.java
index ddc8fe1..86730dc 100644
--- a/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/ThreadPoolSizeAdvisor.java
+++ b/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/ThreadPoolSizeAdvisor.java
@@ -19,6 +19,7 @@
package org.apache.isis.commons.internal.threadpool;
+@Deprecated
final class ThreadPoolSizeAdvisor {
/*
diff --git a/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/ThreadPoolSupport.java b/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/ThreadPoolSupport.java
index 677c3d5..369a11f 100644
--- a/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/ThreadPoolSupport.java
+++ b/core/commons/src/main/java/org/apache/isis/commons/internal/threadpool/ThreadPoolSupport.java
@@ -58,9 +58,11 @@ import lombok.extern.log4j.Log4j2;
* Implementation Note: ThreadPoolSupport::close is triggered by _Context.clear()
* when application shuts down.
*
+ * @deprecated in favor of the {@code org.apache.isis.commons.internal.concurrent} API
*/
+@Deprecated
@Log4j2
-public final class ThreadPoolSupport implements AutoCloseable {
+final class ThreadPoolSupport implements AutoCloseable {
public static ThreadPoolExecutionMode HIGHEST_CONCURRENCY_EXECUTION_MODE_ALLOWED =
ThreadPoolExecutionMode.PARALLEL;
diff --git a/core/metamodel/src/main/java/org/apache/isis/metamodel/services/layout/LayoutServiceDefault.java b/core/metamodel/src/main/java/org/apache/isis/metamodel/services/layout/LayoutServiceDefault.java
index 783906a..6854211 100644
--- a/core/metamodel/src/main/java/org/apache/isis/metamodel/services/layout/LayoutServiceDefault.java
+++ b/core/metamodel/src/main/java/org/apache/isis/metamodel/services/layout/LayoutServiceDefault.java
@@ -32,7 +32,7 @@ import org.apache.isis.applib.services.grid.GridService;
import org.apache.isis.applib.services.jaxb.JaxbService;
import org.apache.isis.applib.services.layout.LayoutService;
import org.apache.isis.applib.services.menu.MenuBarsService;
-import org.apache.isis.commons.ZipWriter;
+import org.apache.isis.commons.compression.ZipWriter;
import org.apache.isis.commons.internal.collections._Lists;
import org.apache.isis.commons.internal.collections._Maps;
import org.apache.isis.metamodel.facets.object.grid.GridFacet;
diff --git a/core/metamodel/src/main/java/org/apache/isis/metamodel/services/registry/ServiceRegistryDefault.java b/core/metamodel/src/main/java/org/apache/isis/metamodel/services/registry/ServiceRegistryDefault.java
index 4e8ff38..754f8c7 100644
--- a/core/metamodel/src/main/java/org/apache/isis/metamodel/services/registry/ServiceRegistryDefault.java
+++ b/core/metamodel/src/main/java/org/apache/isis/metamodel/services/registry/ServiceRegistryDefault.java
@@ -43,8 +43,6 @@ import org.apache.isis.commons.internal.context._Context;
import org.apache.isis.commons.internal.ioc.BeanAdapter;
import org.apache.isis.commons.internal.ioc.spring._Spring;
import org.apache.isis.commons.internal.reflection._Reflect;
-import org.apache.isis.commons.internal.threadpool.ThreadPoolExecutionMode;
-import org.apache.isis.commons.internal.threadpool.ThreadPoolSupport;
import org.apache.isis.config.IsisConfiguration;
import org.apache.isis.config.internal._Config;
import org.apache.isis.config.registry.IsisBeanTypeRegistry;
@@ -61,11 +59,6 @@ public final class ServiceRegistryDefault implements ServiceRegistry, Applicatio
@Override
public void setApplicationContext(ApplicationContext springContext) throws BeansException {
- // disables concurrent Spec-Loading
- ThreadPoolSupport.HIGHEST_CONCURRENCY_EXECUTION_MODE_ALLOWED =
- ThreadPoolExecutionMode.SEQUENTIAL_WITHIN_CALLING_THREAD;
-
-
// ensures a well defined precondition
{
val beanTypeRegistry = _Context.getIfAny(IsisBeanTypeRegistry.class);
diff --git a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoaderDefault.java b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoaderDefault.java
index fa17b63..351ffec 100644
--- a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoaderDefault.java
+++ b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoaderDefault.java
@@ -28,7 +28,7 @@ import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.isis.commons.internal.collections._Lists;
-import org.apache.isis.commons.internal.concurrent._Tasks;
+import org.apache.isis.commons.internal.concurrent.ConcurrentTaskList;
import org.apache.isis.commons.internal.context._Context;
import org.apache.isis.config.IsisConfiguration;
import org.apache.isis.config.internal._Config;
@@ -349,11 +349,6 @@ public class SpecificationLoaderDefault implements SpecificationLoader {
return cache.allSpecifications();
}
- // private Stream<BeanAdapter> streamBeans() {
- // final ServiceRegistry registry = MetaModelContext.current().getServiceRegistry();
- // return registry.streamRegisteredBeans();
- // }
-
/**
* Creates the appropriate type of {@link ObjectSpecification}.
*/
@@ -420,50 +415,26 @@ public class SpecificationLoaderDefault implements SpecificationLoader {
private void introspect(final Collection<ObjectSpecification> specs, final IntrospectionState upTo) {
- val tasks = _Tasks.create();
-
- for (final ObjectSpecification specification : specs) {
-
- val specSpi = (ObjectSpecificationAbstract) specification;
-
- tasks.addRunnable(
- ()->specSpi.introspectUpTo(upTo),
- ()->String.format(
- "%s: #introspectUpTo( %s )",
- specification.getFullIdentifier(), upTo));
+ val isConcurrentFromConfig = (boolean) CONFIG_PROPERTY_PARALLELIZE.from(getConfiguration());
+
+ val runSequential = !isConcurrentFromConfig || true; //FIXME concurrent specloading disabled, it deadlocks
+
+ if(runSequential) {
+
+ for (final ObjectSpecification specification : specs) {
+ val specSpi = (ObjectSpecificationAbstract) specification;
+ specSpi.introspectUpTo(upTo);
+ }
+
+ return;
}
-
- val isConcurrent = (boolean) CONFIG_PROPERTY_PARALLELIZE.from(getConfiguration());
-
- tasks.invokeAndWait(isConcurrent && false); //FIXME concurrent init is broken
+
+ specs.parallelStream()
+ .map(spec -> (ObjectSpecificationAbstract) spec)
+ .forEach(spec -> spec.introspectUpTo(upTo));
}
- // private List<ObjectSpecification> loadSpecificationsFor(
- // final Stream<Class<?>> domainTypes,
- // final List<ObjectSpecification> appendTo,
- // final IntrospectionState upTo) {
- //
- // return domainTypes
- // .map(domainType->internalLoadSpecification(domainType, upTo))
- // .filter(_NullSafe::isPresent)
- // .peek(appendTo::add)
- // .collect(Collectors.toList());
- // }
- //
- // private List<ObjectSpecification> loadSpecificationsForBeans (
- // final Stream<BeanAdapter> beans,
- // final List<ObjectSpecification> appendTo,
- // final IntrospectionState upTo) {
- //
- // return beans
- // .map(BeanAdapter::getBeanClass)
- // .map(type->internalLoadSpecification(type, upTo))
- // .filter(_NullSafe::isPresent)
- // .peek(appendTo::add)
- // .collect(Collectors.toList());
- // }
-
private void invalidateCache(final Class<?> cls) {
if(!cache.isInitialized()) {
diff --git a/core/runtime-services/src/main/java/org/apache/isis/runtime/services/sse/EventStreamServiceDefault.java b/core/runtime-services/src/main/java/org/apache/isis/runtime/services/sse/EventStreamServiceDefault.java
index eb16350..7bfbbdf 100644
--- a/core/runtime-services/src/main/java/org/apache/isis/runtime/services/sse/EventStreamServiceDefault.java
+++ b/core/runtime-services/src/main/java/org/apache/isis/runtime/services/sse/EventStreamServiceDefault.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
import java.util.function.Predicate;
import javax.inject.Inject;
@@ -38,7 +39,6 @@ import org.apache.isis.applib.events.sse.EventStreamService;
import org.apache.isis.applib.events.sse.EventStreamSource;
import org.apache.isis.applib.services.xactn.TransactionService;
import org.apache.isis.commons.internal.collections._Lists;
-import org.apache.isis.commons.internal.threadpool.ThreadPoolSupport;
import org.apache.isis.runtime.system.context.IsisContext;
import org.apache.isis.runtime.system.transaction.IsisTransactionAspectSupport;
@@ -73,12 +73,12 @@ public class EventStreamServiceDefault implements EventStreamService {
Objects.requireNonNull(task);
Objects.requireNonNull(executionBehavior);
-
- val threadPool = ThreadPoolSupport.getInstance();
+
+ val executor = ForkJoinPool.commonPool();
switch(executionBehavior) {
case SIMPLE:
- CompletableFuture.runAsync(()->run(task), threadPool.getExecutor());
+ CompletableFuture.runAsync(()->run(task), executor);
return;
case REQUIRES_NEW_SESSION:
break; // fall through
@@ -90,19 +90,11 @@ public class EventStreamServiceDefault implements EventStreamService {
CompletableFuture.runAsync(()->{
// wait for calling thread to commit its current transaction
- try {
- callingThread_TransactionLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ callingThread_TransactionLatch.await();
IsisContext.getSessionFactory().doInSession(()->run(task));
- }, threadPool.getExecutor());
-
- // ForkJoinPool.commonPool().submit(()->{
- //
- // });
+ }, executor);
}
diff --git a/core/runtime/src/main/java/org/apache/isis/runtime/system/session/IsisSessionFactoryDefault.java b/core/runtime/src/main/java/org/apache/isis/runtime/system/session/IsisSessionFactoryDefault.java
index 257adb0..f68eeab 100644
--- a/core/runtime/src/main/java/org/apache/isis/runtime/system/session/IsisSessionFactoryDefault.java
+++ b/core/runtime/src/main/java/org/apache/isis/runtime/system/session/IsisSessionFactoryDefault.java
@@ -37,7 +37,8 @@ import org.apache.isis.applib.services.title.TitleService;
import org.apache.isis.commons.collections.Bin;
import org.apache.isis.commons.internal.base._Blackhole;
import org.apache.isis.commons.internal.collections._Sets;
-import org.apache.isis.commons.internal.concurrent._Tasks;
+import org.apache.isis.commons.internal.concurrent.ConcurrentContext;
+import org.apache.isis.commons.internal.concurrent.ConcurrentTaskList;
import org.apache.isis.commons.internal.context._Context;
import org.apache.isis.commons.internal.ioc.BeanAdapter;
import org.apache.isis.commons.internal.ioc.BeanSort;
@@ -74,7 +75,6 @@ import lombok.extern.log4j.Log4j2;
@Singleton @Log4j2
public class IsisSessionFactoryDefault implements IsisSessionFactory {
- @Inject private IsisConfiguration configuration;
@Inject private ServiceRegistry serviceRegistry;
@Inject private AuthenticationManager authenticationManager;
@Inject private RuntimeEventService runtimeEventService;
@@ -109,9 +109,9 @@ public class IsisSessionFactoryDefault implements IsisSessionFactory {
runtimeEventService.fireAppPreMetamodel();
- val tasks = _Tasks.create();
-
- tasks.addRunnable("SpecificationLoader.init()", ()->{
+ val taskList = ConcurrentTaskList.named("IsisSessionFactoryDefault Concurrent Tasks");
+
+ taskList.addRunnable("SpecificationLoader.init()", ()->{
// time to initialize...
specificationLoader.init();
@@ -136,12 +136,12 @@ public class IsisSessionFactoryDefault implements IsisSessionFactory {
authorizationManager.init();
});
- tasks.addRunnable("ChangesDtoUtils.init()", ChangesDtoUtils::init);
- tasks.addRunnable("InteractionDtoUtils.init()", InteractionDtoUtils::init);
- tasks.addRunnable("CommandDtoUtils.init()", CommandDtoUtils::init);
+ taskList.addRunnable("ChangesDtoUtils.init()", ChangesDtoUtils::init);
+ taskList.addRunnable("InteractionDtoUtils.init()", InteractionDtoUtils::init);
+ taskList.addRunnable("CommandDtoUtils.init()", CommandDtoUtils::init);
- val concurrentInit = false; // otherwise deadlocks
- tasks.invokeAndWait(concurrentInit);
+ taskList.submit(ConcurrentContext.sequential());
+ taskList.await();
runtimeEventService.fireAppPostMetamodel();
diff --git a/core/runtime/src/main/java/org/apache/isis/runtime/system/transaction/IsisTransactionAspectSupport.java b/core/runtime/src/main/java/org/apache/isis/runtime/system/transaction/IsisTransactionAspectSupport.java
index 1dafdd2..55afd1d 100644
--- a/core/runtime/src/main/java/org/apache/isis/runtime/system/transaction/IsisTransactionAspectSupport.java
+++ b/core/runtime/src/main/java/org/apache/isis/runtime/system/transaction/IsisTransactionAspectSupport.java
@@ -20,6 +20,7 @@ package org.apache.isis.runtime.system.transaction;
import java.util.Optional;
+import org.apache.isis.commons.concurrent.AwaitableLatch;
import org.apache.isis.commons.internal.context._Context;
public final class IsisTransactionAspectSupport {
@@ -44,11 +45,11 @@ public final class IsisTransactionAspectSupport {
.orElse(false);
}
- public static TransactionLatch transactionLatch() {
+ public static AwaitableLatch transactionLatch() {
return currentTransactionObject()
.map(IsisTransactionObject::getCountDownLatch)
- .map(TransactionLatch::of)
- .orElseGet(TransactionLatch::unlocked);
+ .map(AwaitableLatch::of)
+ .orElseGet(AwaitableLatch::unlocked);
}
}
diff --git a/core/viewer-wicket-impl/src/main/java/org/apache/isis/viewer/wicket/viewer/IsisWicketApplication.java b/core/viewer-wicket-impl/src/main/java/org/apache/isis/viewer/wicket/viewer/IsisWicketApplication.java
index 8bd8287..b6dd5b5 100644
--- a/core/viewer-wicket-impl/src/main/java/org/apache/isis/viewer/wicket/viewer/IsisWicketApplication.java
+++ b/core/viewer-wicket-impl/src/main/java/org/apache/isis/viewer/wicket/viewer/IsisWicketApplication.java
@@ -19,14 +19,10 @@
package org.apache.isis.viewer.wicket.viewer;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.function.Function;
import javax.inject.Inject;
@@ -64,9 +60,12 @@ import org.apache.wicket.spring.injection.annot.SpringComponentInjector;
import org.apache.wicket.util.time.Duration;
import org.wicketstuff.select2.ApplicationSettings;
+import org.apache.isis.commons.internal.collections._Lists;
+import org.apache.isis.commons.internal.concurrent.ConcurrentContext;
+import org.apache.isis.commons.internal.concurrent.ConcurrentTask;
+import org.apache.isis.commons.internal.concurrent.ConcurrentTaskList;
import org.apache.isis.commons.internal.context._Context;
import org.apache.isis.commons.internal.resources._Resources;
-import org.apache.isis.commons.internal.threadpool.ThreadPoolSupport;
import org.apache.isis.config.IsisConfiguration;
import org.apache.isis.config.internal._Config;
import org.apache.isis.metamodel.adapter.ObjectAdapter;
@@ -230,11 +229,12 @@ implements ComponentFactoryRegistryAccessor, PageClassRegistryAccessor, WicketVi
val serviceRegistry = IsisContext.getServiceRegistry();
//val serviceInjector = IsisContext.getServiceInjector();
- List<Future<Object>> futures = null;
+ val backgroundInitializationTasks = createBackgroundInitializationTasks();
+
try {
super.init();
- futures = startBackgroundInitializationThreads();
+ backgroundInitializationTasks.submit(ConcurrentContext.sequential());
getRequestCycleSettings().setRenderStrategy(RequestCycleSettings.RenderStrategy.REDIRECT_TO_RENDER);
getResourceSettings().setParentFolderPlaceholder("$up$");
@@ -320,13 +320,13 @@ implements ComponentFactoryRegistryAccessor, PageClassRegistryAccessor, WicketVi
log.debug("storeSettings.maxSizePerSession : {}", getStoreSettings().getMaxSizePerSession());
log.debug("storeSettings.fileStoreFolder : {}", getStoreSettings().getFileStoreFolder());
+ backgroundInitializationTasks.await();
+
} catch(RuntimeException ex) {
// because Wicket's handling in its WicketFilter (that calls this method) does not log the exception.
log.error("Failed to initialize", ex);
throw ex;
- } finally {
- ThreadPoolSupport.getInstance().join(futures);
- }
+ }
acceptIfPresent(IsisWicketThemeSupport.getInstance(), themeSupport->{
IBootstrapSettings settings = Bootstrap.getSettings();
@@ -335,12 +335,16 @@ implements ComponentFactoryRegistryAccessor, PageClassRegistryAccessor, WicketVi
}
- protected List<Future<Object>> startBackgroundInitializationThreads() {
- return ThreadPoolSupport.getInstance().invokeAll(Arrays.asList(
- Executors.callable(this::configureWebJars),
- Executors.callable(this::configureWicketBootstrap),
- Executors.callable(this::configureWicketSelect2)
- ));
+ protected ConcurrentTaskList createBackgroundInitializationTasks() {
+
+ val tasks = _Lists.of(
+ ConcurrentTask.of(this::configureWebJars).withName("Configure WebJars"),
+ ConcurrentTask.of(this::configureWicketBootstrap).withName("Configure WicketBootstrap"),
+ ConcurrentTask.of(this::configureWicketSelect2).withName("Configure WicketSelect2")
+ );
+
+ return ConcurrentTaskList.named("Isis Application Background Initialization Tasks")
+ .addTasks(tasks);
}
/**