You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@groovy.apache.org by su...@apache.org on 2021/07/31 16:35:59 UTC

[groovy] branch master updated: Avoid nested parallel querying which results in deadlock sometimes

This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/master by this push:
     new 7442bc8  Avoid nested parallel querying which results in deadlock sometimes
7442bc8 is described below

commit 7442bc8925acf4d2ce9a1340f17f5aa296c3acbe
Author: Daniel Sun <su...@apache.org>
AuthorDate: Sun Aug 1 00:35:18 2021 +0800

    Avoid nested parallel querying which results in deadlock sometimes
---
 .../collection/runtime/ConcurrentObjectHolder.java | 40 ++---------
 .../collection/runtime/QueryableCollection.java    | 78 ++++++++++------------
 .../test/org/apache/groovy/ginq/GinqTest.groovy    | 26 ++++++++
 3 files changed, 68 insertions(+), 76 deletions(-)

diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/ConcurrentObjectHolder.java b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/ConcurrentObjectHolder.java
index ad6b32d..bec6705 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/ConcurrentObjectHolder.java
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/ConcurrentObjectHolder.java
@@ -20,10 +20,6 @@ package org.apache.groovy.ginq.provider.collection.runtime;
 
 import org.apache.groovy.internal.util.Supplier;
 
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 /**
  * Hold an object thread-safely
  *
@@ -31,47 +27,21 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * @since 4.0.0
  */
 class ConcurrentObjectHolder<T> {
-    private final ReadWriteLock rwl = new ReentrantReadWriteLock();
-    private final Lock readLock = rwl.readLock();
-    private final Lock writeLock = rwl.writeLock();
-
     private volatile T object;
+    private final Supplier<T> supplier;
 
-    public ConcurrentObjectHolder() {}
-
-    public ConcurrentObjectHolder(T object) {
-        this.object = object;
+    public ConcurrentObjectHolder(Supplier<T> supplier) {
+        this.supplier = supplier;
     }
 
     public T getObject() {
-        readLock.lock();
-        try {
-            return object;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public T getObject(Supplier<? extends T> def) {
         if (null != object) return object;
 
-        writeLock.lock();
-        try {
+        synchronized(this) {
             if (null == object) {
-                object = def.get();
+                object = supplier.get();
             }
             return object;
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    public void setObject(T object) {
-        writeLock.lock();
-        try {
-            this.object = object;
-        } finally {
-            writeLock.unlock();
         }
     }
 }
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
index d5d2025..cc5e7eb 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
@@ -120,11 +120,11 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
 
     @Override
     public <U> Queryable<Tuple2<T, U>> innerHashJoin(Queryable<? extends U> queryable, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
-        final ConcurrentObjectHolder<Map<Integer, List<Candidate<U>>>> hashTableHolder = new ConcurrentObjectHolder<>();
-        final Supplier<Map<Integer, List<Candidate<U>>>> hashTableSupplier = createHashTableSupplier(queryable, fieldsExtractor2);
+        final ConcurrentObjectHolder<Map<Integer, List<Candidate<U>>>> hashTableHolder = new ConcurrentObjectHolder<>(createHashTableSupplier(queryable, fieldsExtractor2));
+        if (isParallel()) hashTableHolder.getObject(); // avoid nested parallel querying, which results in deadlock sometimes
         Stream<Tuple2<T, U>> stream = this.stream().flatMap(p -> {
             // build hash table
-            Map<Integer, List<Candidate<U>>> hashTable = buildHashTable(hashTableHolder, hashTableSupplier);
+            Map<Integer, List<Candidate<U>>> hashTable = hashTableHolder.getObject();
 
             // probe the hash table
             return probeHashTable(hashTable, p, fieldsExtractor1);
@@ -298,41 +298,41 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
     public <U> Queryable<U> select(BiFunction<? super T, ? super Queryable<? extends T>, ? extends U> mapper) {
         final String originalParallel = QueryableHelper.getVar(PARALLEL);
         QueryableHelper.setVar(PARALLEL, FALSE_STR); // ensure the row number is generated sequentially
-        boolean useWindowFunction = TRUE_STR.equals(QueryableHelper.getVar(USE_WINDOW_FUNCTION));
-
-        if (useWindowFunction) {
-            this.makeReusable();
-        }
-
-        Stream<U> stream = null;
-        if (this instanceof Group) {
-            this.makeReusable();
-            if (0 == this.count()) {
-                stream = Stream.of((T) tuple(NULL, EMPTY_QUERYABLE)).map((T t) -> mapper.apply(t, this));
+        try {
+            boolean useWindowFunction = TRUE_STR.equals(QueryableHelper.getVar(USE_WINDOW_FUNCTION));
+            if (useWindowFunction) {
+                this.makeReusable();
             }
-        }
-        if (null == stream) {
-            stream = this.stream().map((T t) -> mapper.apply(t, this));
-        }
-
-        if (TRUE_STR.equals(originalParallel)) {
-            // invoke `collect` to trigger the intermediate operator, which will create `CompletableFuture` instances
-            stream = stream.collect(Collectors.toList()).parallelStream().map((U u) -> {
-                boolean interrupted = false;
-                try {
-                    return (U) ((CompletableFuture) u).get();
-                } catch (InterruptedException | ExecutionException ex) {
-                    if (ex instanceof InterruptedException) interrupted = true;
-                    throw new GroovyRuntimeException(ex);
-                } finally {
-                    if (interrupted) Thread.currentThread().interrupt();
+            Stream<U> stream = null;
+            if (this instanceof Group) {
+                this.makeReusable();
+                if (0 == this.count()) {
+                    stream = Stream.of((T) tuple(NULL, EMPTY_QUERYABLE)).map((T t) -> mapper.apply(t, this));
                 }
-            });
-        }
+            }
+            if (null == stream) {
+                stream = this.stream().map((T t) -> mapper.apply(t, this));
+            }
 
-        QueryableHelper.setVar(PARALLEL, originalParallel);
+            if (TRUE_STR.equals(originalParallel)) {
+                // invoke `collect` to trigger the intermediate operator, which will create `CompletableFuture` instances
+                stream = stream.collect(Collectors.toList()).parallelStream().map((U u) -> {
+                    boolean interrupted = false;
+                    try {
+                        return (U) ((CompletableFuture) u).get();
+                    } catch (InterruptedException | ExecutionException ex) {
+                        if (ex instanceof InterruptedException) interrupted = true;
+                        throw new GroovyRuntimeException(ex);
+                    } finally {
+                        if (interrupted) Thread.currentThread().interrupt();
+                    }
+                });
+            }
 
-        return from(stream);
+            return from(stream);
+        } finally {
+            QueryableHelper.setVar(PARALLEL, originalParallel);
+        }
     }
 
     @Override
@@ -526,11 +526,11 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
     }
 
     private static <T, U> Queryable<Tuple2<T, U>> outerHashJoin(Queryable<? extends T> queryable1, Queryable<? extends U> queryable2, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
-        final ConcurrentObjectHolder<Map<Integer, List<Candidate<U>>>> hashTableHolder = new ConcurrentObjectHolder<>();
-        final Supplier<Map<Integer, List<Candidate<U>>>> hashTableSupplier = createHashTableSupplier(queryable2, fieldsExtractor2);
+        final ConcurrentObjectHolder<Map<Integer, List<Candidate<U>>>> hashTableHolder = new ConcurrentObjectHolder<>(createHashTableSupplier(queryable2, fieldsExtractor2));
+        if (isParallel()) hashTableHolder.getObject(); // avoid nested parallel querying, which results in deadlock sometimes
         Stream<Tuple2<T, U>> stream = queryable1.stream().flatMap(p -> {
             // build hash table
-            Map<Integer, List<Candidate<U>>> hashTable = buildHashTable(hashTableHolder, hashTableSupplier);
+            Map<Integer, List<Candidate<U>>> hashTable = hashTableHolder.getObject();
 
             // probe the hash table
             List<Tuple2<T, U>> joinResultList =
@@ -542,10 +542,6 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
         return from(stream);
     }
 
-    private static <U> Map<Integer, List<Candidate<U>>> buildHashTable(final ConcurrentObjectHolder<Map<Integer, List<Candidate<U>>>> hashTableHolder, final Supplier<Map<Integer, List<Candidate<U>>>> hashTableSupplier) {
-        return hashTableHolder.getObject(hashTableSupplier);
-    }
-
     private static <T, U> Stream<Tuple2<T, U>> probeHashTable(Map<Integer, List<Candidate<U>>> hashTable, T p, Function<? super T, ?> fieldsExtractor1) {
         final Object otherFields = fieldsExtractor1.apply(p);
         final Integer h = hash(otherFields);
diff --git a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
index 82e8024..a6bf068 100644
--- a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
+++ b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
@@ -5055,6 +5055,32 @@ class GinqTest {
     }
 
     @Test
+    void "testGinq - parallel - 7"() {
+        assertScript '''
+            import java.util.concurrent.CountDownLatch
+            int cnt = 10
+            def ready = new CountDownLatch(cnt)
+            def cdl = new CountDownLatch(1)
+            def threads = (0..<cnt).collect { seq ->
+                Thread.start {
+                    ready.countDown()
+                    cdl.await()
+                    def result = GQ(optimize:false, parallel:true) {
+                        from n1 in 1..100_000
+                        join n2 in 1..100_000 on n2 == n1
+                        where n1 < 10 && n2 < 20
+                        select n1, n2
+                    }.toList()
+                    assert [[1, 1], [2, 2], [3, 3], [4, 4], [5, 5], [6, 6], [7, 7], [8, 8], [9, 9]] == result
+                }
+            }
+            ready.await()
+            cdl.countDown()
+            threads*.join()
+        '''
+    }
+
+    @Test
     void "testGinq - window - 0"() {
         assertGinqScript '''
 // tag::ginq_winfunction_01[]