You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@isis.apache.org by ah...@apache.org on 2018/09/21 09:11:18 UTC
[isis] branch v2 updated: ISIS-1974: adds tests for
ThreadPoolSupport's sequential execution
This is an automated email from the ASF dual-hosted git repository.
ahuber pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/isis.git
The following commit(s) were added to refs/heads/v2 by this push:
new e4d6dbf ISIS-1974: adds tests for ThreadPoolSupport's sequential execution
e4d6dbf is described below
commit e4d6dbf3d7505ee400053ed9cb5a819f548fdc07
Author: Andi Huber <ah...@apache.org>
AuthorDate: Fri Sep 21 11:11:11 2018 +0200
ISIS-1974: adds tests for ThreadPoolSupport's sequential execution
Task-Url: https://issues.apache.org/jira/browse/ISIS-1974
---
.../FutureWithIndexIntoFutureOfList.java | 62 ++++++++++++
.../core/runtime/threadpool/ThreadPoolSupport.java | 58 +++++++----
.../runtime/threadpool/ThreadPoolSupportTest.java | 108 +++++++++++++++++++++
3 files changed, 208 insertions(+), 20 deletions(-)
diff --git a/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/FutureWithIndexIntoFutureOfList.java b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/FutureWithIndexIntoFutureOfList.java
new file mode 100644
index 0000000..d6ca094
--- /dev/null
+++ b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/FutureWithIndexIntoFutureOfList.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.isis.core.runtime.threadpool;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+final class FutureWithIndexIntoFutureOfList<T> implements Future<T> {
+ final Future<List<T>> commonFuture;
+ final int index;
+
+ FutureWithIndexIntoFutureOfList(Future<List<T>> commonFuture, int index) {
+ this.commonFuture = commonFuture;
+ this.index = index;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return commonFuture.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return commonFuture.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return commonFuture.isDone();
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ return commonFuture.get().get(index);
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return commonFuture.get(timeout, unit).get(index);
+ }
+}
\ No newline at end of file
diff --git a/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java
index e900c74..e6d30c7 100644
--- a/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java
+++ b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java
@@ -19,6 +19,11 @@
package org.apache.isis.core.runtime.threadpool;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
+import static org.apache.isis.commons.internal.base._Casts.uncheckedCast;
+
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -33,12 +38,14 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.isis.commons.internal.base._NullSafe;
import org.apache.isis.commons.internal.collections._Lists;
import org.apache.isis.commons.internal.context._Context;
@@ -59,7 +66,6 @@ public final class ThreadPoolSupport implements AutoCloseable {
private final ThreadGroup group;
private final ThreadPoolExecutor concurrentExecutor;
- private final ThreadPoolExecutor sequentialExecutor;
/**
* @return the application-scoped singleton ThreadPoolSupport instance
@@ -68,7 +74,7 @@ public final class ThreadPoolSupport implements AutoCloseable {
return _Context.computeIfAbsent(ThreadPoolSupport.class, __-> new ThreadPoolSupport());
}
- private ThreadPoolSupport() {
+ ThreadPoolSupport() {
group = new ThreadGroup(ThreadPoolSupport.class.getName());
@@ -87,14 +93,6 @@ public final class ThreadPoolSupport implements AutoCloseable {
workQueueFactory.get(),
threadFactory);
- sequentialExecutor = new ThreadPoolExecutor(
- 1,
- 1,
- KEEP_ALIVE_TIME_SECS,
- TimeUnit.SECONDS,
- workQueueFactory.get(),
- threadFactory);
-
}
/*
@@ -102,12 +100,7 @@ public final class ThreadPoolSupport implements AutoCloseable {
*/
@Override
public void close() throws Exception {
- try {
- concurrentExecutor.shutdown();
- } finally {
- // in case the previous throws, continue execution here
- sequentialExecutor.shutdown();
- }
+ concurrentExecutor.shutdown();
}
/**
@@ -132,16 +125,26 @@ public final class ThreadPoolSupport implements AutoCloseable {
}
/**
- * Executes specified {@code callables} on the sequential executor in sequence, one by one.
+ * Wraps specified {@code callables} into a single task, which is then executed on the default executor.
+ * The single task itself executes specified {@code callables} in sequence, one by one.
* @param callables nullable
* @return non-null
*/
public List<Future<Object>> invokeAllSequential(@Nullable final List<Callable<Object>> callables) {
- return invokeAll(sequentialExecutor, callables);
+ if(_NullSafe.isEmpty(callables)) {
+ return emptyList();
+ }
+ final Future<List<Object>> commonFuture =
+ uncheckedCast(invokeAll(singletonList(toSingleTask(callables))).get(0));
+
+ return IntStream.range(0, callables.size())
+ .mapToObj(index->new FutureWithIndexIntoFutureOfList<Object>(commonFuture, index))
+ .collect(toList());
}
/**
- * Executes specified {@code callables} on the sequential executor in sequence, one by one.
+ * Wraps specified {@code callables} into a single task, which is then executed on the default executor.
+ * The single task itself executes specified {@code callables} in sequence, one by one.
* @param callables nullable
* @return non-null
*/
@@ -212,6 +215,11 @@ public final class ThreadPoolSupport implements AutoCloseable {
return null;
}
}
+
+ @Override
+ public String toString() {
+ return concurrentExecutor.toString();
+ }
// -- HELPERS
@@ -231,7 +239,7 @@ public final class ThreadPoolSupport implements AutoCloseable {
final List<Callable<Object>> callables) {
final long queuedAt = System.currentTimeMillis();
return callables.stream()
- .map(__ -> timed(__, executor.getQueue().size(), queuedAt))
+ .map(callable -> timed(callable, executor.getQueue().size(), queuedAt))
.collect(Collectors.toList());
}
@@ -261,6 +269,16 @@ public final class ThreadPoolSupport implements AutoCloseable {
};
}
+ private Callable<Object> toSingleTask(final List<Callable<Object>> callables) {
+ return () -> {
+ final List<Object> resultList = _Lists.newArrayList();
+ for(Callable<Object> callable : callables) {
+ resultList.add(callable.call()); // any exceptions thrown are propagated
+ }
+ return resultList;
+ };
+ }
+
private static boolean isEmpty(Collection<?> x) { return x==null || x.size() == 0; }
}
diff --git a/core/metamodel/src/test/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupportTest.java b/core/metamodel/src/test/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupportTest.java
new file mode 100644
index 0000000..655f572
--- /dev/null
+++ b/core/metamodel/src/test/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupportTest.java
@@ -0,0 +1,108 @@
+package org.apache.isis.core.runtime.threadpool;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.isis.commons.internal.collections._Lists;
+
+class ThreadPoolSupportTest {
+
+ private ThreadPoolSupport pool;
+ private Buffer buffer;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ pool = new ThreadPoolSupport();
+ buffer = new Buffer();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ pool.close();
+ }
+
+ @Test
+ void shouldPreserveSequence_whenNoThread() {
+
+ System.out.println(""+pool);
+
+ buffer.append("A");
+ buffer.append("B");
+ buffer.append("C");
+ Assertions.assertEquals("ABC", buffer.toString());
+ }
+
+ @Test
+ void shouldPreserveSequence_whenSequentialExecution() {
+
+ final List<Future<Object>> futures = _Lists.newArrayList();
+
+ final List<Callable<Object>> tasks = _Lists.of(
+ ()->{buffer.append("A"); return "A";},
+ ()->{buffer.append("B"); return "B";},
+ ()->{buffer.append("C"); return "C";},
+ ()->{buffer.append("D"); return "D";},
+ ()->{buffer.append("E"); return "E";},
+ ()->{buffer.append("F"); return "F";},
+ ()->{buffer.append("G"); return "G";},
+ ()->{buffer.append("H"); return "H";},
+ ()->{buffer.append("I"); return "I";},
+ ()->{buffer.append("J"); return "J";}
+ );
+
+ for(int i=0; i<256; ++i) {
+ final List<Future<Object>> taskFutures = pool.invokeAllSequential(tasks);
+ pool.joinGatherFailures(taskFutures);
+ futures.addAll(taskFutures);
+ }
+
+ final String resultString = futures.stream()
+ .map(future->{try {
+ return future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ return ""+e;
+ }})
+ .map(result->""+result)
+ .collect(Collectors.joining());
+
+ final String buffer_allowedSequencesRemoved = buffer.toString().replace("ABCDEFGHIJ", "");
+ final String result_allowedSequencesRemoved = resultString.replace("ABCDEFGHIJ", "");
+
+
+ Assertions.assertEquals("", buffer_allowedSequencesRemoved);
+ Assertions.assertEquals("", result_allowedSequencesRemoved);
+
+ }
+
+ // -- HELPER
+
+ /** thread-safe StringBuffer */
+ private static class Buffer {
+
+ final StringBuffer sb = new StringBuffer();
+
+ public Buffer append(final CharSequence c) {
+ synchronized (sb) {
+ sb.append(c);
+ }
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ synchronized (sb) {
+ return sb.toString();
+ }
+ }
+
+ }
+
+}