You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/11/19 13:41:40 UTC

ignite git commit: IGNITE-10285 Fixed U.doInParallel may lead to deadlock - Fixes #5404.

Repository: ignite
Updated Branches:
  refs/heads/master a63a81a51 -> acfdcdaa0


IGNITE-10285 Fixed U.doInParallel may lead to deadlock - Fixes #5404.

Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/acfdcdaa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/acfdcdaa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/acfdcdaa

Branch: refs/heads/master
Commit: acfdcdaa0f667b1ebebc26cea90df030804056a7
Parents: a63a81a
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Mon Nov 19 16:41:28 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Mon Nov 19 16:41:28 2018 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/IgniteUtils.java       | 130 ++++++++++++++--
 .../internal/util/IgniteUtilsSelfTest.java      | 147 ++++++++++++++++++-
 2 files changed, 257 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/acfdcdaa/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index fc6be6d..f1f4253 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -10782,40 +10782,92 @@ public abstract class IgniteUtils {
     ) throws IgniteCheckedException, IgniteInterruptedCheckedException {
         if(srcDatas.isEmpty())
             return Collections.emptyList();
+
         int[] batchSizes = calculateOptimalBatchSizes(parallelismLvl, srcDatas.size());
 
-        List<List<T>> batches = new ArrayList<>(batchSizes.length);
+        List<Batch<T, R>> batches = new ArrayList<>(batchSizes.length);
+
+        // Set for sharing batches between executor and current thread.
+        // If executor cannot perform immediately, we will execute task in the current thread.
+        Set<Batch<T, R>> sharedBatchesSet = new GridConcurrentHashSet<>(batchSizes.length);
 
         Iterator<T> iterator = srcDatas.iterator();
 
-        for (int batchSize : batchSizes) {
-            List<T> batch = new ArrayList<>(batchSize);
+        for (int idx = 0; idx < batchSizes.length; idx++) {
+            int batchSize = batchSizes[idx];
+
+            Batch<T, R> batch = new Batch<>(batchSize);
 
             for (int i = 0; i < batchSize; i++)
-                batch.add(iterator.next());
+                batch.addTask(iterator.next());
 
             batches.add(batch);
         }
 
-        List<Future<Collection<R>>> consumerFutures = batches.stream()
-            .filter(batch -> !batch.isEmpty())
-            .map(batch -> executorSvc.submit(() -> {
-                Collection<R> results = new ArrayList<>(batch.size());
+        batches = batches.stream()
+            .filter(batch -> !batch.tasks.isEmpty())
+            // Add to set only after check that batch is not empty.
+            .peek(sharedBatchesSet::add)
+            // Setup future in batch for waiting result.
+            .peek(batch -> batch.future = executorSvc.submit(() -> {
+                // Batch was stolen by the main stream.
+                if (!sharedBatchesSet.remove(batch)) {
+                    return null;
+                }
+
+                Collection<R> results = new ArrayList<>(batch.tasks.size());
 
-                for (T item : batch)
+                for (T item : batch.tasks)
                     results.add(operation.accept(item));
 
                 return results;
             }))
             .collect(Collectors.toList());
 
-        Throwable error =null;
+        Throwable error = null;
+
+        // Stealing jobs if executor is busy and cannot process task immediately.
+        // Perform batches in a current thread.
+        for (Batch<T, R> batch : sharedBatchesSet) {
+            // Executor steal task.
+            if (!sharedBatchesSet.remove(batch))
+                continue;
+
+            Collection<R> res = new ArrayList<>(batch.tasks.size());
+
+            try {
+                for (T item : batch.tasks)
+                    res.add(operation.accept(item));
+
+                batch.result(res);
+            }
+            catch (IgniteCheckedException e) {
+                batch.result(e);
+            }
+        }
 
+        // Final result collection.
         Collection<R> results = new ArrayList<>(srcDatas.size());
 
-        for (Future<Collection<R>> future : consumerFutures) {
+        for (Batch<T, R> batch: batches) {
             try {
-                results.addAll(future.get());
+                Throwable err = batch.error;
+
+                if (err != null) {
+                    if (error == null)
+                        error = err;
+                    else
+                        error.addSuppressed(err);
+
+                    continue;
+                }
+
+                Collection<R> res = batch.result();
+
+                if (res != null)
+                    results.addAll(res);
+                else
+                    assert error != null;
             }
             catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -10853,6 +10905,60 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * The batch of tasks with a batch index in global array.
+     */
+    private static class Batch<T,R> {
+        /** List tasks. */
+        private final List<T> tasks;
+
+        /** */
+        private Collection<R> result;
+
+        /** */
+        private Throwable error;
+
+        /** */
+        private Future<Collection<R>> future;
+
+        /**
+         * @param batchSize Batch size.
+         */
+        private Batch(int batchSize) {
+            this.tasks = new ArrayList<>(batchSize);
+        }
+
+        /**
+         * @param task Add task.
+         */
+        public void addTask(T task){
+            tasks.add(task);
+        }
+
+        /**
+         * @param res Setup results for tasks.
+         */
+        public void result(Collection<R> res) {
+            this.result = res;
+        }
+
+        /**
+         * @param e Throwable if task was completed with error.
+         */
+        public void result(Throwable e) {
+            this.error = e;
+        }
+
+        /**
+         * Get tasks results.
+         */
+        public Collection<R> result() throws ExecutionException, InterruptedException {
+            assert future != null;
+
+            return result != null ? result : future.get();
+        }
+    }
+
+    /**
      * Split number of tasks into optimized batches.
      * @param parallelismLvl Level of parallelism.
      * @param size number of tasks to split.

http://git-wip-us.apache.org/repos/asf/ignite/blob/acfdcdaa/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 2f52b21..39bb21ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -44,20 +44,27 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -70,7 +77,9 @@ import org.apache.ignite.testframework.http.GridEmbeddedHttpServer;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
 import org.jetbrains.annotations.Nullable;
+import org.junit.Assert;
 
+import static java.util.Arrays.asList;
 import static org.junit.Assert.assertArrayEquals;
 
 /**
@@ -334,7 +343,7 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
 
             arr = new SelfReferencedJob[] {this, this};
 
-            col = Arrays.asList(this, this, this);
+            col = asList(this, this, this);
 
             newContext();
 
@@ -875,10 +884,10 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
         TcpDiscoveryNode node250ts = new TcpDiscoveryNode();
         node250ts.version(v250ts);
 
-        assertTrue(U.isOldestNodeVersionAtLeast(v240, Arrays.asList(node240, node241, node250, node250ts)));
-        assertFalse(U.isOldestNodeVersionAtLeast(v241, Arrays.asList(node240, node241, node250, node250ts)));
-        assertTrue(U.isOldestNodeVersionAtLeast(v250, Arrays.asList(node250, node250ts)));
-        assertTrue(U.isOldestNodeVersionAtLeast(v250ts, Arrays.asList(node250, node250ts)));
+        assertTrue(U.isOldestNodeVersionAtLeast(v240, asList(node240, node241, node250, node250ts)));
+        assertFalse(U.isOldestNodeVersionAtLeast(v241, asList(node240, node241, node250, node250ts)));
+        assertTrue(U.isOldestNodeVersionAtLeast(v250, asList(node250, node250ts)));
+        assertTrue(U.isOldestNodeVersionAtLeast(v250ts, asList(node250, node250ts)));
     }
 
     /**
@@ -892,7 +901,7 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
         try {
             IgniteUtils.doInParallel(3,
                 executorService,
-                Arrays.asList(1, 2, 3),
+                asList(1, 2, 3),
                 i -> {
                     try {
                         barrier.await(1, TimeUnit.SECONDS);
@@ -920,7 +929,7 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
         try {
             IgniteUtils.doInParallel(2,
                 executorService,
-                Arrays.asList(1, 2, 3),
+                asList(1, 2, 3),
                 i -> {
                     try {
                         barrier.await(400, TimeUnit.MILLISECONDS);
@@ -989,6 +998,128 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Test parallel execution steal job.
+     */
+    public void testDoInParallelWithStealingJob() throws IgniteCheckedException {
+        // Pool size should be less that input data collection.
+        ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+        CountDownLatch mainThreadLatch = new CountDownLatch(1);
+        CountDownLatch poolThreadLatch = new CountDownLatch(1);
+
+        // Busy one thread from the pool.
+        executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    poolThreadLatch.await();
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedException(e);
+                }
+            }
+        });
+
+        List<Integer> data = asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+        AtomicInteger taskProcessed = new AtomicInteger();
+
+        long threadId = Thread.currentThread().getId();
+
+        AtomicInteger curThreadCnt = new AtomicInteger();
+        AtomicInteger poolThreadCnt = new AtomicInteger();
+
+        Collection<Integer> res = U.doInParallel(10,
+            executorService,
+            data,
+            new IgniteThrowableConsumer<Integer, Integer>() {
+                @Override public Integer accept(Integer cnt) throws IgniteInterruptedCheckedException {
+                    // Release thread in pool in the middle of range.
+                    if (taskProcessed.getAndIncrement() == (data.size() / 2) - 1) {
+                        poolThreadLatch.countDown();
+
+                        try {
+                            // Await thread in thread pool complete task.
+                            mainThreadLatch.await();
+                        }
+                        catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+
+                            throw new IgniteInterruptedCheckedException(e);
+                        }
+                    }
+
+                    // Increment if executed in current thread.
+                    if (Thread.currentThread().getId() == threadId)
+                        curThreadCnt.incrementAndGet();
+                    else {
+                        poolThreadCnt.incrementAndGet();
+
+                        if (taskProcessed.get() == data.size())
+                            mainThreadLatch.countDown();
+                    }
+
+                    return -cnt;
+                }
+            });
+
+        Assert.assertEquals(curThreadCnt.get() + poolThreadCnt.get(), data.size());
+        Assert.assertEquals(5, curThreadCnt.get());
+        Assert.assertEquals(5, poolThreadCnt.get());
+        Assert.assertEquals(asList(0, -1, -2, -3, -4, -5, -6, -7, -8, -9), res);
+    }
+
+    /**
+     * Test parallel execution steal job.
+     */
+    public void testDoInParallelWithStealingJobRunTaskInExecutor() throws Exception {
+        // Pool size should be less that input data collection.
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+        Future<?> f1 = executorService.submit(()-> runTask(executorService));
+        Future<?> f2 = executorService.submit(()-> runTask(executorService));
+        Future<?> f3 = executorService.submit(()-> runTask(executorService));
+
+        f1.get();
+        f2.get();
+        f3.get();
+    }
+
+    /**
+     *
+     * @param executorService Executor service.
+     */
+    private void runTask(ExecutorService executorService) {
+        List<Integer> data = asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+        long threadId = Thread.currentThread().getId();
+
+        AtomicInteger curThreadCnt = new AtomicInteger();
+
+        Collection<Integer> res;
+
+        try {
+            res = U.doInParallel(10,
+                executorService,
+                data,
+                new IgniteThrowableConsumer<Integer, Integer>() {
+                    @Override public Integer accept(Integer cnt) {
+                        if (Thread.currentThread().getId() == threadId)
+                            curThreadCnt.incrementAndGet();
+
+                        return -cnt;
+                    }
+                });
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+
+        Assert.assertTrue(curThreadCnt.get() > 0);
+        Assert.assertEquals(asList(0, -1, -2, -3, -4, -5, -6, -7, -8, -9), res);
+    }
+
+    /**
      * Template method to test parallel execution
      * @param executorService ExecutorService.
      * @param size Size.
@@ -1030,7 +1161,7 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
             IgniteUtils.doInParallel(
                 1,
                 executorService,
-                Arrays.asList(1, 2, 3),
+                asList(1, 2, 3),
                 i -> {
                     if (Integer.valueOf(1).equals(i))
                         throw new IgniteCheckedException(expectedException);