You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@groovy.apache.org by su...@apache.org on 2021/07/31 16:35:59 UTC
[groovy] branch master updated: Avoid nested parallel querying
which results in deadlock sometimes
This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/master by this push:
new 7442bc8 Avoid nested parallel querying which results in deadlock sometimes
7442bc8 is described below
commit 7442bc8925acf4d2ce9a1340f17f5aa296c3acbe
Author: Daniel Sun <su...@apache.org>
AuthorDate: Sun Aug 1 00:35:18 2021 +0800
Avoid nested parallel querying which results in deadlock sometimes
---
.../collection/runtime/ConcurrentObjectHolder.java | 40 ++---------
.../collection/runtime/QueryableCollection.java | 78 ++++++++++------------
.../test/org/apache/groovy/ginq/GinqTest.groovy | 26 ++++++++
3 files changed, 68 insertions(+), 76 deletions(-)
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/ConcurrentObjectHolder.java b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/ConcurrentObjectHolder.java
index ad6b32d..bec6705 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/ConcurrentObjectHolder.java
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/ConcurrentObjectHolder.java
@@ -20,10 +20,6 @@ package org.apache.groovy.ginq.provider.collection.runtime;
import org.apache.groovy.internal.util.Supplier;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
/**
* Hold an object thread-safely
*
@@ -31,47 +27,21 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* @since 4.0.0
*/
class ConcurrentObjectHolder<T> {
- private final ReadWriteLock rwl = new ReentrantReadWriteLock();
- private final Lock readLock = rwl.readLock();
- private final Lock writeLock = rwl.writeLock();
-
private volatile T object;
+ private final Supplier<T> supplier;
- public ConcurrentObjectHolder() {}
-
- public ConcurrentObjectHolder(T object) {
- this.object = object;
+ public ConcurrentObjectHolder(Supplier<T> supplier) {
+ this.supplier = supplier;
}
public T getObject() {
- readLock.lock();
- try {
- return object;
- } finally {
- readLock.unlock();
- }
- }
-
- public T getObject(Supplier<? extends T> def) {
if (null != object) return object;
- writeLock.lock();
- try {
+ synchronized(this) {
if (null == object) {
- object = def.get();
+ object = supplier.get();
}
return object;
- } finally {
- writeLock.unlock();
- }
- }
-
- public void setObject(T object) {
- writeLock.lock();
- try {
- this.object = object;
- } finally {
- writeLock.unlock();
}
}
}
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
index d5d2025..cc5e7eb 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
@@ -120,11 +120,11 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
@Override
public <U> Queryable<Tuple2<T, U>> innerHashJoin(Queryable<? extends U> queryable, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
- final ConcurrentObjectHolder<Map<Integer, List<Candidate<U>>>> hashTableHolder = new ConcurrentObjectHolder<>();
- final Supplier<Map<Integer, List<Candidate<U>>>> hashTableSupplier = createHashTableSupplier(queryable, fieldsExtractor2);
+ final ConcurrentObjectHolder<Map<Integer, List<Candidate<U>>>> hashTableHolder = new ConcurrentObjectHolder<>(createHashTableSupplier(queryable, fieldsExtractor2));
+ if (isParallel()) hashTableHolder.getObject(); // avoid nested parallel querying, which results in deadlock sometimes
Stream<Tuple2<T, U>> stream = this.stream().flatMap(p -> {
// build hash table
- Map<Integer, List<Candidate<U>>> hashTable = buildHashTable(hashTableHolder, hashTableSupplier);
+ Map<Integer, List<Candidate<U>>> hashTable = hashTableHolder.getObject();
// probe the hash table
return probeHashTable(hashTable, p, fieldsExtractor1);
@@ -298,41 +298,41 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
public <U> Queryable<U> select(BiFunction<? super T, ? super Queryable<? extends T>, ? extends U> mapper) {
final String originalParallel = QueryableHelper.getVar(PARALLEL);
QueryableHelper.setVar(PARALLEL, FALSE_STR); // ensure the row number is generated sequentially
- boolean useWindowFunction = TRUE_STR.equals(QueryableHelper.getVar(USE_WINDOW_FUNCTION));
-
- if (useWindowFunction) {
- this.makeReusable();
- }
-
- Stream<U> stream = null;
- if (this instanceof Group) {
- this.makeReusable();
- if (0 == this.count()) {
- stream = Stream.of((T) tuple(NULL, EMPTY_QUERYABLE)).map((T t) -> mapper.apply(t, this));
+ try {
+ boolean useWindowFunction = TRUE_STR.equals(QueryableHelper.getVar(USE_WINDOW_FUNCTION));
+ if (useWindowFunction) {
+ this.makeReusable();
}
- }
- if (null == stream) {
- stream = this.stream().map((T t) -> mapper.apply(t, this));
- }
-
- if (TRUE_STR.equals(originalParallel)) {
- // invoke `collect` to trigger the intermediate operator, which will create `CompletableFuture` instances
- stream = stream.collect(Collectors.toList()).parallelStream().map((U u) -> {
- boolean interrupted = false;
- try {
- return (U) ((CompletableFuture) u).get();
- } catch (InterruptedException | ExecutionException ex) {
- if (ex instanceof InterruptedException) interrupted = true;
- throw new GroovyRuntimeException(ex);
- } finally {
- if (interrupted) Thread.currentThread().interrupt();
+ Stream<U> stream = null;
+ if (this instanceof Group) {
+ this.makeReusable();
+ if (0 == this.count()) {
+ stream = Stream.of((T) tuple(NULL, EMPTY_QUERYABLE)).map((T t) -> mapper.apply(t, this));
}
- });
- }
+ }
+ if (null == stream) {
+ stream = this.stream().map((T t) -> mapper.apply(t, this));
+ }
- QueryableHelper.setVar(PARALLEL, originalParallel);
+ if (TRUE_STR.equals(originalParallel)) {
+ // invoke `collect` to trigger the intermediate operator, which will create `CompletableFuture` instances
+ stream = stream.collect(Collectors.toList()).parallelStream().map((U u) -> {
+ boolean interrupted = false;
+ try {
+ return (U) ((CompletableFuture) u).get();
+ } catch (InterruptedException | ExecutionException ex) {
+ if (ex instanceof InterruptedException) interrupted = true;
+ throw new GroovyRuntimeException(ex);
+ } finally {
+ if (interrupted) Thread.currentThread().interrupt();
+ }
+ });
+ }
- return from(stream);
+ return from(stream);
+ } finally {
+ QueryableHelper.setVar(PARALLEL, originalParallel);
+ }
}
@Override
@@ -526,11 +526,11 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
}
private static <T, U> Queryable<Tuple2<T, U>> outerHashJoin(Queryable<? extends T> queryable1, Queryable<? extends U> queryable2, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
- final ConcurrentObjectHolder<Map<Integer, List<Candidate<U>>>> hashTableHolder = new ConcurrentObjectHolder<>();
- final Supplier<Map<Integer, List<Candidate<U>>>> hashTableSupplier = createHashTableSupplier(queryable2, fieldsExtractor2);
+ final ConcurrentObjectHolder<Map<Integer, List<Candidate<U>>>> hashTableHolder = new ConcurrentObjectHolder<>(createHashTableSupplier(queryable2, fieldsExtractor2));
+ if (isParallel()) hashTableHolder.getObject(); // avoid nested parallel querying, which results in deadlock sometimes
Stream<Tuple2<T, U>> stream = queryable1.stream().flatMap(p -> {
// build hash table
- Map<Integer, List<Candidate<U>>> hashTable = buildHashTable(hashTableHolder, hashTableSupplier);
+ Map<Integer, List<Candidate<U>>> hashTable = hashTableHolder.getObject();
// probe the hash table
List<Tuple2<T, U>> joinResultList =
@@ -542,10 +542,6 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
return from(stream);
}
- private static <U> Map<Integer, List<Candidate<U>>> buildHashTable(final ConcurrentObjectHolder<Map<Integer, List<Candidate<U>>>> hashTableHolder, final Supplier<Map<Integer, List<Candidate<U>>>> hashTableSupplier) {
- return hashTableHolder.getObject(hashTableSupplier);
- }
-
private static <T, U> Stream<Tuple2<T, U>> probeHashTable(Map<Integer, List<Candidate<U>>> hashTable, T p, Function<? super T, ?> fieldsExtractor1) {
final Object otherFields = fieldsExtractor1.apply(p);
final Integer h = hash(otherFields);
diff --git a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
index 82e8024..a6bf068 100644
--- a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
+++ b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
@@ -5055,6 +5055,32 @@ class GinqTest {
}
@Test
+ void "testGinq - parallel - 7"() {
+ assertScript '''
+ import java.util.concurrent.CountDownLatch
+ int cnt = 10
+ def ready = new CountDownLatch(cnt)
+ def cdl = new CountDownLatch(1)
+ def threads = (0..<cnt).collect { seq ->
+ Thread.start {
+ ready.countDown()
+ cdl.await()
+ def result = GQ(optimize:false, parallel:true) {
+ from n1 in 1..100_000
+ join n2 in 1..100_000 on n2 == n1
+ where n1 < 10 && n2 < 20
+ select n1, n2
+ }.toList()
+ assert [[1, 1], [2, 2], [3, 3], [4, 4], [5, 5], [6, 6], [7, 7], [8, 8], [9, 9]] == result
+ }
+ }
+ ready.await()
+ cdl.countDown()
+ threads*.join()
+ '''
+ }
+
+ @Test
void "testGinq - window - 0"() {
assertGinqScript '''
// tag::ginq_winfunction_01[]