You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@groovy.apache.org by su...@apache.org on 2020/12/14 16:14:34 UTC
[groovy] branch master updated: Support parallel querying in GINQ
This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/master by this push:
new 63928d9 Support parallel querying in GINQ
63928d9 is described below
commit 63928d9dcb6894a8083bb4e6612eed0f3bc5f82a
Author: Daniel Sun <su...@apache.org>
AuthorDate: Tue Dec 15 00:05:54 2020 +0800
Support parallel querying in GINQ
---
.../apache/groovy/ginq/GinqGroovyMethods.groovy | 9 ++-
.../org/apache/groovy/ginq/dsl/GinqAstBuilder.java | 3 +
.../ginq/provider/collection/GinqAstWalker.groovy | 22 +++++-
.../collection/runtime/QueryableCollection.java | 89 ++++++++++++++++------
.../collection/runtime/QueryableHelper.groovy | 15 +++-
.../groovy-ginq/src/spec/doc/ginq-userguide.adoc | 7 ++
.../test/org/apache/groovy/ginq/GinqTest.groovy | 26 +++++++
7 files changed, 143 insertions(+), 28 deletions(-)
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/GinqGroovyMethods.groovy b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/GinqGroovyMethods.groovy
index f870080..6d0b54f 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/GinqGroovyMethods.groovy
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/GinqGroovyMethods.groovy
@@ -75,7 +75,7 @@ class GinqGroovyMethods {
* Transform GINQ code to target method invocation
*
* @param ctx the macro context
- * @param ginqConfigurationMapExpression specify the configuration for GINQ, e.g. {@code astWalker}, {@code optimize}
+ * @param ginqConfigurationMapExpression specify the configuration for GINQ, e.g. {@code astWalker}, {@code optimize}, {@code parallel}
* @param ginqClosureExpression hold the GINQ code
* @return target method invocation
* @since 4.0.0
@@ -92,7 +92,7 @@ class GinqGroovyMethods {
Map<String, String> configuration = createConfiguration(ginqConfigurationMapExpression)
- if (TRUE == configuration.get(CONF_OPTIMIZE, TRUE)) {
+ if (TRUE_STR == configuration.get(CONF_OPTIMIZE, TRUE_STR)) {
GinqAstOptimizer ginqAstOptimizer = new GinqAstOptimizer()
ginqAstOptimizer.visitGinqExpression(ginqExpression)
}
@@ -116,8 +116,9 @@ class GinqGroovyMethods {
private GinqGroovyMethods() {}
- private static final String CONF_OPTIMIZE = 'optimize'
+ public static final String CONF_PARALLEL = 'parallel'
private static final String CONF_AST_WALKER = 'astWalker'
+ private static final String CONF_OPTIMIZE = 'optimize'
private static final String DEFAULT_AST_WALKER_CLASS_NAME = GinqAstWalker.class.name
- private static final String TRUE = 'true'
+ private static final String TRUE_STR = 'true'
}
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/dsl/GinqAstBuilder.java b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/dsl/GinqAstBuilder.java
index 68d5cb7..72ecd03 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/dsl/GinqAstBuilder.java
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/dsl/GinqAstBuilder.java
@@ -57,6 +57,7 @@ import java.util.Set;
* @since 4.0.0
*/
public class GinqAstBuilder extends CodeVisitorSupport implements SyntaxErrorReportable {
+ public static final String ROOT_GINQ_EXPRESSION = "__ROOT_GINQ_EXPRESSION";
private final Deque<GinqExpression> ginqExpressionStack = new ArrayDeque<>();
private GinqExpression latestGinqExpression;
private final SourceUnit sourceUnit;
@@ -88,6 +89,8 @@ public class GinqAstBuilder extends CodeVisitorSupport implements SyntaxErrorRep
methodCallExpression.getLineNumber(), methodCallExpression.getColumnNumber()));
}
+ latestGinqExpression.putNodeMetaData(ROOT_GINQ_EXPRESSION, latestGinqExpression);
+
return latestGinqExpression;
}
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/GinqAstWalker.groovy b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/GinqAstWalker.groovy
index bdb1678..b5c4b2f 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/GinqAstWalker.groovy
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/GinqAstWalker.groovy
@@ -19,7 +19,9 @@
package org.apache.groovy.ginq.provider.collection
import groovy.transform.CompileStatic
+import org.apache.groovy.ginq.GinqGroovyMethods
import org.apache.groovy.ginq.dsl.GinqAstBaseVisitor
+import org.apache.groovy.ginq.dsl.GinqAstBuilder
import org.apache.groovy.ginq.dsl.GinqAstVisitor
import org.apache.groovy.ginq.dsl.GinqSyntaxError
import org.apache.groovy.ginq.dsl.SyntaxErrorReportable
@@ -84,8 +86,10 @@ import static org.codehaus.groovy.ast.tools.GeneralUtils.localVarX
import static org.codehaus.groovy.ast.tools.GeneralUtils.param
import static org.codehaus.groovy.ast.tools.GeneralUtils.params
import static org.codehaus.groovy.ast.tools.GeneralUtils.propX
+import static org.codehaus.groovy.ast.tools.GeneralUtils.returnS
import static org.codehaus.groovy.ast.tools.GeneralUtils.stmt
import static org.codehaus.groovy.ast.tools.GeneralUtils.varX
+
/**
* Visit AST of GINQ to generate target method calls for GINQ
*
@@ -166,6 +170,13 @@ class GinqAstWalker implements GinqAstVisitor<Expression>, SyntaxErrorReportable
MethodCallExpression selectMethodCallExpression = this.visitSelectExpression(selectExpression)
List<Statement> statementList = []
+
+ boolean isRootGinqExpression = ginqExpression === ginqExpression.getNodeMetaData(GinqAstBuilder.ROOT_GINQ_EXPRESSION)
+ boolean parallelEnabled = isRootGinqExpression && TRUE_STR == configuration.get(GinqGroovyMethods.CONF_PARALLEL)
+ if (parallelEnabled) {
+ statementList << stmt(callX(QUERYABLE_HELPER_TYPE, 'setVar', args(new ConstantExpression(PARALLEL), new ConstantExpression(TRUE_STR))))
+ }
+
statementList << declS(
localVarX(metaDataMapName).tap {it.modifiers |= Opcodes.ACC_FINAL},
callX(MAPS_TYPE, "of", args(
@@ -177,7 +188,14 @@ class GinqAstWalker implements GinqAstVisitor<Expression>, SyntaxErrorReportable
if (rowNumberUsed) {
statementList << declS(localVarX(rowNumberName), new ConstantExpression(0L))
}
- statementList << stmt(selectMethodCallExpression)
+
+ final resultName = "__r${System.nanoTime()}"
+ statementList << declS(localVarX(resultName).tap {it.modifiers |= Opcodes.ACC_FINAL}, selectMethodCallExpression)
+
+ if (parallelEnabled) {
+ statementList << stmt(callX(QUERYABLE_HELPER_TYPE, 'removeVar', args(new ConstantExpression(PARALLEL))))
+ }
+ statementList << returnS(varX(resultName))
def result = callX(lambdaX(block(statementList as Statement[])), "call")
@@ -1084,6 +1102,8 @@ class GinqAstWalker implements GinqAstVisitor<Expression>, SyntaxErrorReportable
private static final String NAMEDRECORD_CLASS_NAME = NamedRecord.class.name
private static final String EMPTY_STRING = ''
+ private static final String PARALLEL = 'parallel'
+ private static final String TRUE_STR = 'true'
private static final String __METHOD_CALL_RECEIVER = "__METHOD_CALL_RECEIVER"
private static final String __GROUPBY_VISITED = "__GROUPBY_VISITED"
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
index c8881ac..3c6ffb1 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
@@ -39,6 +39,9 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -58,6 +61,11 @@ import static org.apache.groovy.ginq.provider.collection.runtime.Queryable.from;
@Internal
class QueryableCollection<T> implements Queryable<T>, Serializable {
private static final long serialVersionUID = -5067092453136522893L;
+ public static final String PARALLEL = "parallel";
+ public static final String TRUE_STR = "true";
+ private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+ private final Lock readLock = rwl.readLock();
+ private final Lock writeLock = rwl.writeLock();
private Iterable<T> sourceIterable;
private Stream<T> sourceStream;
@@ -70,11 +78,16 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
}
public Iterator<T> iterator() {
- if (null != sourceIterable) {
- return sourceIterable.iterator();
- }
+ readLock.lock();
+ try {
+ if (null != sourceIterable) {
+ return sourceIterable.iterator();
+ }
- return sourceStream.iterator();
+ return sourceStream.iterator();
+ } finally {
+ readLock.unlock();
+ }
}
@Override
@@ -100,8 +113,7 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
final Supplier<Map<Integer, List<U>>> hashTableSupplier = createHashTableSupplier(queryable, fieldsExtractor2);
Stream<Tuple2<T, U>> stream = this.stream().flatMap(p -> {
// build hash table
- Map<Integer, List<U>> hashTable =
- hashTableHolder.getObject(hashTableSupplier);
+ Map<Integer, List<U>> hashTable = buildHashTable(hashTableHolder, hashTableSupplier);
// probe the hash table
return probeHashTable(hashTable, p, fieldsExtractor1, fieldsExtractor2);
@@ -111,7 +123,7 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
}
private static <U> Supplier<Map<Integer, List<U>>> createHashTableSupplier(Queryable<? extends U> queryable, Function<? super U, ?> fieldsExtractor2) {
- return () -> queryable.stream().parallel()
+ return () -> queryable.stream()
.collect(
Collectors.toMap(
c -> hash(fieldsExtractor2.apply(c)),
@@ -402,8 +414,7 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
final Supplier<Map<Integer, List<U>>> hashTableSupplier = createHashTableSupplier(queryable2, fieldsExtractor2);
Stream<Tuple2<T, U>> stream = queryable1.stream().flatMap(p -> {
// build hash table
- Map<Integer, List<U>> hashTable =
- hashTableHolder.getObject(hashTableSupplier);
+ Map<Integer, List<U>> hashTable = buildHashTable(hashTableHolder, hashTableSupplier);
// probe the hash table
List<Tuple2<T, U>> joinResultList =
@@ -415,6 +426,16 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
return from(stream);
}
+ private static <U> Map<Integer, List<U>> buildHashTable(final ObjectHolder<Map<Integer, List<U>>> hashTableHolder, final Supplier<Map<Integer, List<U>>> hashTableSupplier) {
+ Map<Integer, List<U>> hashTable = hashTableHolder.getObject();
+ if (null == hashTable) {
+ synchronized (hashTableHolder) {
+ hashTable = hashTableHolder.getObject(hashTableSupplier);
+ }
+ }
+ return hashTable;
+ }
+
private static <T, U> Stream<Tuple2<T, U>> probeHashTable(Map<Integer, List<U>> hashTable, T p, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
final Object otherFields = fieldsExtractor1.apply(p);
return hashTable.entrySet().stream()
@@ -429,14 +450,19 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
@Override
public List<T> toList() {
- if (sourceIterable instanceof List) {
- return (List<T>) sourceIterable;
- }
+ writeLock.lock();
+ try {
+ if (sourceIterable instanceof List) {
+ return (List<T>) sourceIterable;
+ }
- final List<T> result = stream().collect(Collectors.toList());
- sourceIterable = result;
+ final List<T> result = stream().collect(Collectors.toList());
+ sourceIterable = result;
- return result;
+ return result;
+ } finally {
+ writeLock.unlock();
+ }
}
@Override
@@ -446,11 +472,20 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
@Override
public Stream<T> stream() {
- if (isReusable()) {
- sourceStream = toStream(sourceIterable); // we have to create new stream every time because Java stream can not be reused
- }
+ writeLock.lock();
+ try {
+ if (isReusable()) {
+ sourceStream = toStream(sourceIterable); // we have to create new stream every time because Java stream can not be reused
+ }
+
+ if (!sourceStream.isParallel() && TRUE_STR.equals(QueryableHelper.getVar(PARALLEL))) {
+ sourceStream = sourceStream.parallel();
+ }
- return sourceStream;
+ return sourceStream;
+ } finally {
+ writeLock.unlock();
+ }
}
private static <T> Stream<T> toStream(Iterable<T> sourceIterable) {
@@ -458,13 +493,23 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
}
private boolean isReusable() {
- return null != sourceIterable;
+ readLock.lock();
+ try {
+ return null != sourceIterable;
+ } finally {
+ readLock.unlock();
+ }
}
private void makeReusable() {
- if (null != this.sourceIterable) return;
+ writeLock.lock();
+ try {
+ if (null != this.sourceIterable) return;
- this.sourceIterable = this.sourceStream.collect(Collectors.toList());
+ this.sourceIterable = this.sourceStream.collect(Collectors.toList());
+ } finally {
+ writeLock.unlock();
+ }
}
public Object asType(Class<?> clazz) {
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
index f4c399a..34edcbc 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
@@ -22,7 +22,7 @@ import groovy.transform.CompileStatic
import java.util.stream.Collectors
-import static Queryable.from
+import static org.apache.groovy.ginq.provider.collection.runtime.Queryable.from
/**
* Helper for {@link Queryable}
@@ -70,5 +70,18 @@ class QueryableHelper {
throw new TooManyValuesException("subquery returns more than one value: $list")
}
+ static void setVar(String name, Object value) {
+ VAR_HOLDER.get().put(name, value)
+ }
+
+ static Object getVar(String name) {
+ VAR_HOLDER.get().get(name)
+ }
+
+ static Object removeVar(String name) {
+ VAR_HOLDER.get().remove(name)
+ }
+
+ private static final ThreadLocal<Map<String, Object>> VAR_HOLDER = ThreadLocal.<Map<String, Object>>withInitial(() -> new LinkedHashMap<>())
private QueryableHelper() {}
}
diff --git a/subprojects/groovy-ginq/src/spec/doc/ginq-userguide.adoc b/subprojects/groovy-ginq/src/spec/doc/ginq-userguide.adoc
index 1d3b82d..917bc31 100644
--- a/subprojects/groovy-ginq/src/spec/doc/ginq-userguide.adoc
+++ b/subprojects/groovy-ginq/src/spec/doc/ginq-userguide.adoc
@@ -456,6 +456,13 @@ This is like `update` in SQL
include::../test/org/apache/groovy/ginq/GinqTest.groovy[tags=ginq_tips_07,indent=0]
----
+==== Parallel Querying
+Parallel querying is useful when querying big data sources. It is disabled by default, but we could enable it by hand:
+[source, groovy]
+----
+include::../test/org/apache/groovy/ginq/GinqTest.groovy[tags=ginq_tips_08,indent=0]
+----
+
==== Customize GINQ
For advanced users, you could customize GINQ behaviour by specifying your own target code generator.
diff --git a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
index 3ba1932..5834ce9 100644
--- a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
+++ b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
@@ -4419,6 +4419,32 @@ class GinqTest {
'''
}
+ @Test
+ void "testGinq - parallel - 1"() {
+ assertGinqScript '''
+// tag::ginq_tips_08[]
+ assert [[1, 1], [2, 2], [3, 3]] == GQ(parallel: true) {
+ from n1 in 1..1000
+ innerhashjoin n2 in 1..10000 on n2 == n1
+ where n1 <= 3 && n2 <= 5
+ select n1, n2
+ }.toList()
+// end::ginq_tips_08[]
+ '''
+ }
+
+ @Test
+ void "testGinq - parallel - 2"() {
+ assertGinqScript '''
+ assert [[1, 1], [2, 2], [3, 3]] == GQ(optimize: false, parallel: true) {
+ from n1 in 1..1000
+ innerhashjoin n2 in 1..10000 on n2 == n1
+ where n1 <= 3 && n2 <= 5
+ select n1, n2
+ }.toList()
+ '''
+ }
+
private static void assertGinqScript(String script) {
String deoptimizedScript = script.replaceAll(/\bGQ\s*[{]/, 'GQ(optimize:false) {')
List<String> scriptList = [deoptimizedScript, script]