You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@groovy.apache.org by su...@apache.org on 2020/12/14 16:14:34 UTC

[groovy] branch master updated: Support parallel querying in GINQ

This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/master by this push:
     new 63928d9  Support parallel querying in GINQ
63928d9 is described below

commit 63928d9dcb6894a8083bb4e6612eed0f3bc5f82a
Author: Daniel Sun <su...@apache.org>
AuthorDate: Tue Dec 15 00:05:54 2020 +0800

    Support parallel querying in GINQ
---
 .../apache/groovy/ginq/GinqGroovyMethods.groovy    |  9 ++-
 .../org/apache/groovy/ginq/dsl/GinqAstBuilder.java |  3 +
 .../ginq/provider/collection/GinqAstWalker.groovy  | 22 +++++-
 .../collection/runtime/QueryableCollection.java    | 89 ++++++++++++++++------
 .../collection/runtime/QueryableHelper.groovy      | 15 +++-
 .../groovy-ginq/src/spec/doc/ginq-userguide.adoc   |  7 ++
 .../test/org/apache/groovy/ginq/GinqTest.groovy    | 26 +++++++
 7 files changed, 143 insertions(+), 28 deletions(-)

diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/GinqGroovyMethods.groovy b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/GinqGroovyMethods.groovy
index f870080..6d0b54f 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/GinqGroovyMethods.groovy
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/GinqGroovyMethods.groovy
@@ -75,7 +75,7 @@ class GinqGroovyMethods {
      * Transform GINQ code to target method invocation
      *
      * @param ctx the macro context
-     * @param ginqConfigurationMapExpression specify the configuration for GINQ, e.g. {@code astWalker}, {@code optimize}
+     * @param ginqConfigurationMapExpression specify the configuration for GINQ, e.g. {@code astWalker}, {@code optimize}, {@code parallel}
      * @param ginqClosureExpression hold the GINQ code
      * @return target method invocation
      * @since 4.0.0
@@ -92,7 +92,7 @@ class GinqGroovyMethods {
 
         Map<String, String> configuration = createConfiguration(ginqConfigurationMapExpression)
 
-        if (TRUE == configuration.get(CONF_OPTIMIZE, TRUE)) {
+        if (TRUE_STR == configuration.get(CONF_OPTIMIZE, TRUE_STR)) {
             GinqAstOptimizer ginqAstOptimizer = new GinqAstOptimizer()
             ginqAstOptimizer.visitGinqExpression(ginqExpression)
         }
@@ -116,8 +116,9 @@ class GinqGroovyMethods {
 
     private GinqGroovyMethods() {}
 
-    private static final String CONF_OPTIMIZE = 'optimize'
+    public static final String CONF_PARALLEL = 'parallel'
     private static final String CONF_AST_WALKER = 'astWalker'
+    private static final String CONF_OPTIMIZE = 'optimize'
     private static final String DEFAULT_AST_WALKER_CLASS_NAME = GinqAstWalker.class.name
-    private static final String TRUE = 'true'
+    private static final String TRUE_STR = 'true'
 }
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/dsl/GinqAstBuilder.java b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/dsl/GinqAstBuilder.java
index 68d5cb7..72ecd03 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/dsl/GinqAstBuilder.java
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/dsl/GinqAstBuilder.java
@@ -57,6 +57,7 @@ import java.util.Set;
  * @since 4.0.0
  */
 public class GinqAstBuilder extends CodeVisitorSupport implements SyntaxErrorReportable {
+    public static final String ROOT_GINQ_EXPRESSION = "__ROOT_GINQ_EXPRESSION";
     private final Deque<GinqExpression> ginqExpressionStack = new ArrayDeque<>();
     private GinqExpression latestGinqExpression;
     private final SourceUnit sourceUnit;
@@ -88,6 +89,8 @@ public class GinqAstBuilder extends CodeVisitorSupport implements SyntaxErrorRep
                     methodCallExpression.getLineNumber(), methodCallExpression.getColumnNumber()));
         }
 
+        latestGinqExpression.putNodeMetaData(ROOT_GINQ_EXPRESSION, latestGinqExpression);
+
         return latestGinqExpression;
     }
 
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/GinqAstWalker.groovy b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/GinqAstWalker.groovy
index bdb1678..b5c4b2f 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/GinqAstWalker.groovy
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/GinqAstWalker.groovy
@@ -19,7 +19,9 @@
 package org.apache.groovy.ginq.provider.collection
 
 import groovy.transform.CompileStatic
+import org.apache.groovy.ginq.GinqGroovyMethods
 import org.apache.groovy.ginq.dsl.GinqAstBaseVisitor
+import org.apache.groovy.ginq.dsl.GinqAstBuilder
 import org.apache.groovy.ginq.dsl.GinqAstVisitor
 import org.apache.groovy.ginq.dsl.GinqSyntaxError
 import org.apache.groovy.ginq.dsl.SyntaxErrorReportable
@@ -84,8 +86,10 @@ import static org.codehaus.groovy.ast.tools.GeneralUtils.localVarX
 import static org.codehaus.groovy.ast.tools.GeneralUtils.param
 import static org.codehaus.groovy.ast.tools.GeneralUtils.params
 import static org.codehaus.groovy.ast.tools.GeneralUtils.propX
+import static org.codehaus.groovy.ast.tools.GeneralUtils.returnS
 import static org.codehaus.groovy.ast.tools.GeneralUtils.stmt
 import static org.codehaus.groovy.ast.tools.GeneralUtils.varX
+
 /**
  * Visit AST of GINQ to generate target method calls for GINQ
  *
@@ -166,6 +170,13 @@ class GinqAstWalker implements GinqAstVisitor<Expression>, SyntaxErrorReportable
         MethodCallExpression selectMethodCallExpression = this.visitSelectExpression(selectExpression)
 
         List<Statement> statementList = []
+
+        boolean isRootGinqExpression = ginqExpression === ginqExpression.getNodeMetaData(GinqAstBuilder.ROOT_GINQ_EXPRESSION)
+        boolean parallelEnabled = isRootGinqExpression && TRUE_STR == configuration.get(GinqGroovyMethods.CONF_PARALLEL)
+        if (parallelEnabled) {
+            statementList << stmt(callX(QUERYABLE_HELPER_TYPE, 'setVar', args(new ConstantExpression(PARALLEL), new ConstantExpression(TRUE_STR))))
+        }
+
         statementList << declS(
                 localVarX(metaDataMapName).tap {it.modifiers |= Opcodes.ACC_FINAL},
                 callX(MAPS_TYPE, "of", args(
@@ -177,7 +188,14 @@ class GinqAstWalker implements GinqAstVisitor<Expression>, SyntaxErrorReportable
         if (rowNumberUsed) {
             statementList << declS(localVarX(rowNumberName), new ConstantExpression(0L))
         }
-        statementList << stmt(selectMethodCallExpression)
+
+        final resultName = "__r${System.nanoTime()}"
+        statementList << declS(localVarX(resultName).tap {it.modifiers |= Opcodes.ACC_FINAL}, selectMethodCallExpression)
+
+        if (parallelEnabled) {
+            statementList << stmt(callX(QUERYABLE_HELPER_TYPE, 'removeVar', args(new ConstantExpression(PARALLEL))))
+        }
+        statementList << returnS(varX(resultName))
 
         def result = callX(lambdaX(block(statementList as Statement[])), "call")
 
@@ -1084,6 +1102,8 @@ class GinqAstWalker implements GinqAstVisitor<Expression>, SyntaxErrorReportable
     private static final String NAMEDRECORD_CLASS_NAME = NamedRecord.class.name
 
     private static final String EMPTY_STRING = ''
+    private static final String PARALLEL = 'parallel'
+    private static final String TRUE_STR = 'true'
 
     private static final String __METHOD_CALL_RECEIVER = "__METHOD_CALL_RECEIVER"
     private static final String __GROUPBY_VISITED = "__GROUPBY_VISITED"
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
index c8881ac..3c6ffb1 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
@@ -39,6 +39,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiPredicate;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -58,6 +61,11 @@ import static org.apache.groovy.ginq.provider.collection.runtime.Queryable.from;
 @Internal
 class QueryableCollection<T> implements Queryable<T>, Serializable {
     private static final long serialVersionUID = -5067092453136522893L;
+    public static final String PARALLEL = "parallel";
+    public static final String TRUE_STR = "true";
+    private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+    private final Lock readLock = rwl.readLock();
+    private final Lock writeLock = rwl.writeLock();
     private Iterable<T> sourceIterable;
     private Stream<T> sourceStream;
 
@@ -70,11 +78,16 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
     }
 
     public Iterator<T> iterator() {
-        if (null != sourceIterable) {
-            return sourceIterable.iterator();
-        }
+        readLock.lock();
+        try {
+            if (null != sourceIterable) {
+                return sourceIterable.iterator();
+            }
 
-        return sourceStream.iterator();
+            return sourceStream.iterator();
+        } finally {
+            readLock.unlock();
+        }
     }
 
     @Override
@@ -100,8 +113,7 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
         final Supplier<Map<Integer, List<U>>> hashTableSupplier = createHashTableSupplier(queryable, fieldsExtractor2);
         Stream<Tuple2<T, U>> stream = this.stream().flatMap(p -> {
             // build hash table
-            Map<Integer, List<U>> hashTable =
-                    hashTableHolder.getObject(hashTableSupplier);
+            Map<Integer, List<U>> hashTable = buildHashTable(hashTableHolder, hashTableSupplier);
 
             // probe the hash table
             return probeHashTable(hashTable, p, fieldsExtractor1, fieldsExtractor2);
@@ -111,7 +123,7 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
     }
 
     private static <U> Supplier<Map<Integer, List<U>>> createHashTableSupplier(Queryable<? extends U> queryable, Function<? super U, ?> fieldsExtractor2) {
-        return () -> queryable.stream().parallel()
+        return () -> queryable.stream()
                 .collect(
                         Collectors.toMap(
                                 c -> hash(fieldsExtractor2.apply(c)),
@@ -402,8 +414,7 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
         final Supplier<Map<Integer, List<U>>> hashTableSupplier = createHashTableSupplier(queryable2, fieldsExtractor2);
         Stream<Tuple2<T, U>> stream = queryable1.stream().flatMap(p -> {
             // build hash table
-            Map<Integer, List<U>> hashTable =
-                    hashTableHolder.getObject(hashTableSupplier);
+            Map<Integer, List<U>> hashTable = buildHashTable(hashTableHolder, hashTableSupplier);
 
             // probe the hash table
             List<Tuple2<T, U>> joinResultList =
@@ -415,6 +426,16 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
         return from(stream);
     }
 
+    private static <U> Map<Integer, List<U>> buildHashTable(final ObjectHolder<Map<Integer, List<U>>> hashTableHolder, final Supplier<Map<Integer, List<U>>> hashTableSupplier) {
+        Map<Integer, List<U>> hashTable = hashTableHolder.getObject();
+        if (null == hashTable) {
+            synchronized (hashTableHolder) {
+                hashTable = hashTableHolder.getObject(hashTableSupplier);
+            }
+        }
+        return hashTable;
+    }
+
     private static <T, U> Stream<Tuple2<T, U>> probeHashTable(Map<Integer, List<U>> hashTable, T p, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
         final Object otherFields = fieldsExtractor1.apply(p);
         return hashTable.entrySet().stream()
@@ -429,14 +450,19 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
 
     @Override
     public List<T> toList() {
-        if (sourceIterable instanceof List) {
-            return (List<T>) sourceIterable;
-        }
+        writeLock.lock();
+        try {
+            if (sourceIterable instanceof List) {
+                return (List<T>) sourceIterable;
+            }
 
-        final List<T> result = stream().collect(Collectors.toList());
-        sourceIterable = result;
+            final List<T> result = stream().collect(Collectors.toList());
+            sourceIterable = result;
 
-        return result;
+            return result;
+        } finally {
+            writeLock.unlock();
+        }
     }
 
     @Override
@@ -446,11 +472,20 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
 
     @Override
     public Stream<T> stream() {
-        if (isReusable()) {
-            sourceStream = toStream(sourceIterable);  // we have to create new stream every time because Java stream can not be reused
-        }
+        writeLock.lock();
+        try {
+            if (isReusable()) {
+                sourceStream = toStream(sourceIterable);  // we have to create new stream every time because Java stream can not be reused
+            }
+
+            if (!sourceStream.isParallel() && TRUE_STR.equals(QueryableHelper.getVar(PARALLEL))) {
+                sourceStream = sourceStream.parallel();
+            }
 
-        return sourceStream;
+            return sourceStream;
+        } finally {
+            writeLock.unlock();
+        }
     }
 
     private static <T> Stream<T> toStream(Iterable<T> sourceIterable) {
@@ -458,13 +493,23 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
     }
 
     private boolean isReusable() {
-        return null != sourceIterable;
+        readLock.lock();
+        try {
+            return null != sourceIterable;
+        } finally {
+            readLock.unlock();
+        }
     }
 
     private void makeReusable() {
-        if (null != this.sourceIterable) return;
+        writeLock.lock();
+        try {
+            if (null != this.sourceIterable) return;
 
-        this.sourceIterable = this.sourceStream.collect(Collectors.toList());
+            this.sourceIterable = this.sourceStream.collect(Collectors.toList());
+        } finally {
+            writeLock.unlock();
+        }
     }
 
     public Object asType(Class<?> clazz) {
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
index f4c399a..34edcbc 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
@@ -22,7 +22,7 @@ import groovy.transform.CompileStatic
 
 import java.util.stream.Collectors
 
-import static Queryable.from
+import static org.apache.groovy.ginq.provider.collection.runtime.Queryable.from
 
 /**
  * Helper for {@link Queryable}
@@ -70,5 +70,18 @@ class QueryableHelper {
         throw new TooManyValuesException("subquery returns more than one value: $list")
     }
 
+    static void setVar(String name, Object value) {
+        VAR_HOLDER.get().put(name, value)
+    }
+
+    static Object getVar(String name) {
+        VAR_HOLDER.get().get(name)
+    }
+
+    static Object removeVar(String name) {
+        VAR_HOLDER.get().remove(name)
+    }
+
+    private static final ThreadLocal<Map<String, Object>> VAR_HOLDER = ThreadLocal.<Map<String, Object>>withInitial(() -> new LinkedHashMap<>())
     private QueryableHelper() {}
 }
diff --git a/subprojects/groovy-ginq/src/spec/doc/ginq-userguide.adoc b/subprojects/groovy-ginq/src/spec/doc/ginq-userguide.adoc
index 1d3b82d..917bc31 100644
--- a/subprojects/groovy-ginq/src/spec/doc/ginq-userguide.adoc
+++ b/subprojects/groovy-ginq/src/spec/doc/ginq-userguide.adoc
@@ -456,6 +456,13 @@ This is like `update` in SQL
 include::../test/org/apache/groovy/ginq/GinqTest.groovy[tags=ginq_tips_07,indent=0]
 ----
 
+==== Parallel Querying
+Parallel querying is useful when querying big data sources. It is disabled by default, but we could enable it by hand:
+[source, groovy]
+----
+include::../test/org/apache/groovy/ginq/GinqTest.groovy[tags=ginq_tips_08,indent=0]
+----
+
 ==== Customize GINQ
 
 For advanced users, you could customize GINQ behaviour by specifying your own target code generator.
diff --git a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
index 3ba1932..5834ce9 100644
--- a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
+++ b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
@@ -4419,6 +4419,32 @@ class GinqTest {
         '''
     }
 
+    @Test
+    void "testGinq - parallel - 1"() {
+        assertGinqScript '''
+// tag::ginq_tips_08[]
+            assert [[1, 1], [2, 2], [3, 3]] == GQ(parallel: true) {
+                from n1 in 1..1000
+                innerhashjoin n2 in 1..10000 on n2 == n1
+                where n1 <= 3 && n2 <= 5
+                select n1, n2
+            }.toList()
+// end::ginq_tips_08[]
+        '''
+    }
+
+    @Test
+    void "testGinq - parallel - 2"() {
+        assertGinqScript '''
+            assert [[1, 1], [2, 2], [3, 3]] == GQ(optimize: false, parallel: true) {
+                from n1 in 1..1000
+                innerhashjoin n2 in 1..10000 on n2 == n1
+                where n1 <= 3 && n2 <= 5
+                select n1, n2
+            }.toList()
+        '''
+    }
+
     private static void assertGinqScript(String script) {
         String deoptimizedScript = script.replaceAll(/\bGQ\s*[{]/, 'GQ(optimize:false) {')
         List<String> scriptList = [deoptimizedScript, script]