You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/11/20 14:55:11 UTC
[28/50] [abbrv] ignite git commit: IGNITE-10285 Fixed U.doInParallel
may lead to deadlock - Fixes #5404.
IGNITE-10285 Fixed U.doInParallel may lead to deadlock - Fixes #5404.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/acfdcdaa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/acfdcdaa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/acfdcdaa
Branch: refs/heads/ignite-10044
Commit: acfdcdaa0f667b1ebebc26cea90df030804056a7
Parents: a63a81a
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Mon Nov 19 16:41:28 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Mon Nov 19 16:41:28 2018 +0300
----------------------------------------------------------------------
.../ignite/internal/util/IgniteUtils.java | 130 ++++++++++++++--
.../internal/util/IgniteUtilsSelfTest.java | 147 ++++++++++++++++++-
2 files changed, 257 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/acfdcdaa/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index fc6be6d..f1f4253 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -10782,40 +10782,92 @@ public abstract class IgniteUtils {
) throws IgniteCheckedException, IgniteInterruptedCheckedException {
if(srcDatas.isEmpty())
return Collections.emptyList();
+
int[] batchSizes = calculateOptimalBatchSizes(parallelismLvl, srcDatas.size());
- List<List<T>> batches = new ArrayList<>(batchSizes.length);
+ List<Batch<T, R>> batches = new ArrayList<>(batchSizes.length);
+
+ // Set for sharing batches between executor and current thread.
+ // If executor cannot perform immediately, we will execute task in the current thread.
+ Set<Batch<T, R>> sharedBatchesSet = new GridConcurrentHashSet<>(batchSizes.length);
Iterator<T> iterator = srcDatas.iterator();
- for (int batchSize : batchSizes) {
- List<T> batch = new ArrayList<>(batchSize);
+ for (int idx = 0; idx < batchSizes.length; idx++) {
+ int batchSize = batchSizes[idx];
+
+ Batch<T, R> batch = new Batch<>(batchSize);
for (int i = 0; i < batchSize; i++)
- batch.add(iterator.next());
+ batch.addTask(iterator.next());
batches.add(batch);
}
- List<Future<Collection<R>>> consumerFutures = batches.stream()
- .filter(batch -> !batch.isEmpty())
- .map(batch -> executorSvc.submit(() -> {
- Collection<R> results = new ArrayList<>(batch.size());
+ batches = batches.stream()
+ .filter(batch -> !batch.tasks.isEmpty())
+ // Add to set only after check that batch is not empty.
+ .peek(sharedBatchesSet::add)
+ // Setup future in batch for waiting result.
+ .peek(batch -> batch.future = executorSvc.submit(() -> {
+ // Batch was stolen by the main stream.
+ if (!sharedBatchesSet.remove(batch)) {
+ return null;
+ }
+
+ Collection<R> results = new ArrayList<>(batch.tasks.size());
- for (T item : batch)
+ for (T item : batch.tasks)
results.add(operation.accept(item));
return results;
}))
.collect(Collectors.toList());
- Throwable error =null;
+ Throwable error = null;
+
+ // Stealing jobs if executor is busy and cannot process task immediately.
+ // Perform batches in a current thread.
+ for (Batch<T, R> batch : sharedBatchesSet) {
+ // Executor steal task.
+ if (!sharedBatchesSet.remove(batch))
+ continue;
+
+ Collection<R> res = new ArrayList<>(batch.tasks.size());
+
+ try {
+ for (T item : batch.tasks)
+ res.add(operation.accept(item));
+
+ batch.result(res);
+ }
+ catch (IgniteCheckedException e) {
+ batch.result(e);
+ }
+ }
+ // Final result collection.
Collection<R> results = new ArrayList<>(srcDatas.size());
- for (Future<Collection<R>> future : consumerFutures) {
+ for (Batch<T, R> batch: batches) {
try {
- results.addAll(future.get());
+ Throwable err = batch.error;
+
+ if (err != null) {
+ if (error == null)
+ error = err;
+ else
+ error.addSuppressed(err);
+
+ continue;
+ }
+
+ Collection<R> res = batch.result();
+
+ if (res != null)
+ results.addAll(res);
+ else
+ assert error != null;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -10853,6 +10905,60 @@ public abstract class IgniteUtils {
}
/**
+ * The batch of tasks with a batch index in global array.
+ */
+ private static class Batch<T,R> {
+ /** List tasks. */
+ private final List<T> tasks;
+
+ /** */
+ private Collection<R> result;
+
+ /** */
+ private Throwable error;
+
+ /** */
+ private Future<Collection<R>> future;
+
+ /**
+ * @param batchSize Batch size.
+ */
+ private Batch(int batchSize) {
+ this.tasks = new ArrayList<>(batchSize);
+ }
+
+ /**
+ * @param task Add task.
+ */
+ public void addTask(T task){
+ tasks.add(task);
+ }
+
+ /**
+ * @param res Setup results for tasks.
+ */
+ public void result(Collection<R> res) {
+ this.result = res;
+ }
+
+ /**
+ * @param e Throwable if task was completed with error.
+ */
+ public void result(Throwable e) {
+ this.error = e;
+ }
+
+ /**
+ * Get tasks results.
+ */
+ public Collection<R> result() throws ExecutionException, InterruptedException {
+ assert future != null;
+
+ return result != null ? result : future.get();
+ }
+ }
+
+ /**
* Split number of tasks into optimized batches.
* @param parallelismLvl Level of parallelism.
* @param size number of tasks to split.
http://git-wip-us.apache.org/repos/asf/ignite/blob/acfdcdaa/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 2f52b21..39bb21ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -44,20 +44,27 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -70,7 +77,9 @@ import org.apache.ignite.testframework.http.GridEmbeddedHttpServer;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.jetbrains.annotations.Nullable;
+import org.junit.Assert;
+import static java.util.Arrays.asList;
import static org.junit.Assert.assertArrayEquals;
/**
@@ -334,7 +343,7 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
arr = new SelfReferencedJob[] {this, this};
- col = Arrays.asList(this, this, this);
+ col = asList(this, this, this);
newContext();
@@ -875,10 +884,10 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
TcpDiscoveryNode node250ts = new TcpDiscoveryNode();
node250ts.version(v250ts);
- assertTrue(U.isOldestNodeVersionAtLeast(v240, Arrays.asList(node240, node241, node250, node250ts)));
- assertFalse(U.isOldestNodeVersionAtLeast(v241, Arrays.asList(node240, node241, node250, node250ts)));
- assertTrue(U.isOldestNodeVersionAtLeast(v250, Arrays.asList(node250, node250ts)));
- assertTrue(U.isOldestNodeVersionAtLeast(v250ts, Arrays.asList(node250, node250ts)));
+ assertTrue(U.isOldestNodeVersionAtLeast(v240, asList(node240, node241, node250, node250ts)));
+ assertFalse(U.isOldestNodeVersionAtLeast(v241, asList(node240, node241, node250, node250ts)));
+ assertTrue(U.isOldestNodeVersionAtLeast(v250, asList(node250, node250ts)));
+ assertTrue(U.isOldestNodeVersionAtLeast(v250ts, asList(node250, node250ts)));
}
/**
@@ -892,7 +901,7 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
try {
IgniteUtils.doInParallel(3,
executorService,
- Arrays.asList(1, 2, 3),
+ asList(1, 2, 3),
i -> {
try {
barrier.await(1, TimeUnit.SECONDS);
@@ -920,7 +929,7 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
try {
IgniteUtils.doInParallel(2,
executorService,
- Arrays.asList(1, 2, 3),
+ asList(1, 2, 3),
i -> {
try {
barrier.await(400, TimeUnit.MILLISECONDS);
@@ -989,6 +998,128 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
}
/**
+ * Test parallel execution steal job.
+ */
+ public void testDoInParallelWithStealingJob() throws IgniteCheckedException {
+ // Pool size should be less that input data collection.
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+ CountDownLatch mainThreadLatch = new CountDownLatch(1);
+ CountDownLatch poolThreadLatch = new CountDownLatch(1);
+
+ // Busy one thread from the pool.
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ poolThreadLatch.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ }
+ });
+
+ List<Integer> data = asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ AtomicInteger taskProcessed = new AtomicInteger();
+
+ long threadId = Thread.currentThread().getId();
+
+ AtomicInteger curThreadCnt = new AtomicInteger();
+ AtomicInteger poolThreadCnt = new AtomicInteger();
+
+ Collection<Integer> res = U.doInParallel(10,
+ executorService,
+ data,
+ new IgniteThrowableConsumer<Integer, Integer>() {
+ @Override public Integer accept(Integer cnt) throws IgniteInterruptedCheckedException {
+ // Release thread in pool in the middle of range.
+ if (taskProcessed.getAndIncrement() == (data.size() / 2) - 1) {
+ poolThreadLatch.countDown();
+
+ try {
+ // Await thread in thread pool complete task.
+ mainThreadLatch.await();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedCheckedException(e);
+ }
+ }
+
+ // Increment if executed in current thread.
+ if (Thread.currentThread().getId() == threadId)
+ curThreadCnt.incrementAndGet();
+ else {
+ poolThreadCnt.incrementAndGet();
+
+ if (taskProcessed.get() == data.size())
+ mainThreadLatch.countDown();
+ }
+
+ return -cnt;
+ }
+ });
+
+ Assert.assertEquals(curThreadCnt.get() + poolThreadCnt.get(), data.size());
+ Assert.assertEquals(5, curThreadCnt.get());
+ Assert.assertEquals(5, poolThreadCnt.get());
+ Assert.assertEquals(asList(0, -1, -2, -3, -4, -5, -6, -7, -8, -9), res);
+ }
+
+ /**
+ * Test parallel execution steal job.
+ */
+ public void testDoInParallelWithStealingJobRunTaskInExecutor() throws Exception {
+ // Pool size should be less that input data collection.
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+ Future<?> f1 = executorService.submit(()-> runTask(executorService));
+ Future<?> f2 = executorService.submit(()-> runTask(executorService));
+ Future<?> f3 = executorService.submit(()-> runTask(executorService));
+
+ f1.get();
+ f2.get();
+ f3.get();
+ }
+
+ /**
+ *
+ * @param executorService Executor service.
+ */
+ private void runTask(ExecutorService executorService) {
+ List<Integer> data = asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ long threadId = Thread.currentThread().getId();
+
+ AtomicInteger curThreadCnt = new AtomicInteger();
+
+ Collection<Integer> res;
+
+ try {
+ res = U.doInParallel(10,
+ executorService,
+ data,
+ new IgniteThrowableConsumer<Integer, Integer>() {
+ @Override public Integer accept(Integer cnt) {
+ if (Thread.currentThread().getId() == threadId)
+ curThreadCnt.incrementAndGet();
+
+ return -cnt;
+ }
+ });
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ Assert.assertTrue(curThreadCnt.get() > 0);
+ Assert.assertEquals(asList(0, -1, -2, -3, -4, -5, -6, -7, -8, -9), res);
+ }
+
+ /**
* Template method to test parallel execution
* @param executorService ExecutorService.
* @param size Size.
@@ -1030,7 +1161,7 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
IgniteUtils.doInParallel(
1,
executorService,
- Arrays.asList(1, 2, 3),
+ asList(1, 2, 3),
i -> {
if (Integer.valueOf(1).equals(i))
throw new IgniteCheckedException(expectedException);