You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@groovy.apache.org by su...@apache.org on 2020/12/31 04:26:38 UTC

[groovy] branch master updated: Ensure the row number is generated sequentially

This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/master by this push:
     new cf5c86b  Ensure the row number is generated sequentially
cf5c86b is described below

commit cf5c86b8e07a54bc85c37a859860a743928c469e
Author: Daniel Sun <su...@apache.org>
AuthorDate: Thu Dec 31 12:26:14 2020 +0800

    Ensure the row number is generated sequentially
---
 .../collection/runtime/QueryableCollection.java    | 43 ++++++++++++++--------
 .../collection/runtime/QueryableHelper.groovy      | 10 ++---
 .../test/org/apache/groovy/ginq/GinqTest.groovy    | 11 ++++++
 3 files changed, 44 insertions(+), 20 deletions(-)

diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
index 60803cf..ade24b5 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
@@ -265,12 +265,21 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
 
     @Override
     public <U> Queryable<U> select(BiFunction<? super T, ? super Queryable<? extends T>, ? extends U> mapper) {
-        if (TRUE_STR.equals(QueryableHelper.getVar(USE_WINDOW_FUNCTION))) {
+        String originalParallel = null;
+        boolean useWindowFunction = TRUE_STR.equals(QueryableHelper.getVar(USE_WINDOW_FUNCTION));
+
+        if (useWindowFunction) {
+            originalParallel = QueryableHelper.getVar(PARALLEL);
+            QueryableHelper.setVar(PARALLEL, FALSE_STR); // ensure the row number is generated sequentially
             this.makeReusable();
         }
 
         Stream<U> stream = this.stream().map((T t) -> mapper.apply(t, this));
 
+        if (useWindowFunction) {
+            QueryableHelper.setVar(PARALLEL, originalParallel);
+        }
+
         return from(stream);
     }
 
@@ -501,20 +510,23 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
     public <U extends Comparable<? super U>> Window<T> over(Tuple2<T, Long> currentRecord, WindowDefinition<T, U> windowDefinition) {
         this.makeReusable();
         Queryable<Tuple2<T, Long>> partition =
-                from(Collections.singletonList(currentRecord)).innerHashJoin(partitionCache.computeIfAbsent(windowDefinition, wd -> {
-                    long[] rn = new long[] { 1L };
-                    List<Tuple2<T, Long>> listWithIndex =
-                            this.toList().stream()
-                                    .map(e -> Tuple.tuple(e, rn[0]++))
-                                    .collect(Collectors.toList());
-
-                    final Queryable<Tuple2<?, Queryable<Tuple2<T, Long>>>> q = from(listWithIndex).groupBy(wd.partitionBy().compose(Tuple2::getV1));
-                    if (q instanceof QueryableCollection) {
-                        ((QueryableCollection) q).makeReusable();
-                    }
-                    return q;
-                }), a -> windowDefinition.partitionBy().apply(a.getV1()), Tuple2::getV1)
-                        .select((e, q) -> e.getV2().getV2())
+                from(Collections.singletonList(currentRecord)).innerHashJoin(
+                        partitionCache.computeIfAbsent(windowDefinition, wd -> {
+                            long[] rn = new long[] { 1L };
+                            List<Tuple2<T, Long>> listWithIndex =
+                                    this.toList().stream()
+                                            .map(e -> Tuple.tuple(e, rn[0]++))
+                                            .collect(Collectors.toList());
+
+                            final Queryable<Tuple2<?, Queryable<Tuple2<T, Long>>>> q =
+                                    from(listWithIndex)
+                                            .groupBy(wd.partitionBy().compose(Tuple2::getV1));
+                            if (q instanceof QueryableCollection) {
+                                ((QueryableCollection) q).makeReusable();
+                            }
+                            return q;
+                        }), a -> windowDefinition.partitionBy().apply(a.getV1()), Tuple2::getV1
+                ).select((e, q) -> e.getV2().getV2())
                         .stream()
                         .findFirst()
                         .orElse(Queryable.emptyQueryable());
@@ -604,5 +616,6 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
     private static final String USE_WINDOW_FUNCTION = "useWindowFunction";
     private static final String PARALLEL = "parallel";
     private static final String TRUE_STR = "true";
+    private static final String FALSE_STR = "false";
     private static final long serialVersionUID = -5067092453136522893L;
 }
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
index a91c95c..179d026 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
@@ -70,16 +70,16 @@ class QueryableHelper {
         throw new TooManyValuesException("subquery returns more than one value: $list")
     }
 
-    static void setVar(String name, Object value) {
+    static <T> void setVar(String name, T value) {
         VAR_HOLDER.get().put(name, value)
     }
 
-    static Object getVar(String name) {
-        VAR_HOLDER.get().get(name)
+    static <T> T getVar(String name) {
+        (T) VAR_HOLDER.get().get(name)
     }
 
-    static Object removeVar(String name) {
-        VAR_HOLDER.get().remove(name)
+    static <T> T  removeVar(String name) {
+        (T) VAR_HOLDER.get().remove(name)
     }
 
     private static final ThreadLocal<Map<String, Object>> VAR_HOLDER = ThreadLocal.<Map<String, Object>> withInitial(() -> new LinkedHashMap<>())
diff --git a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
index 1a0b668..ba2fe62 100644
--- a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
+++ b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
@@ -5223,6 +5223,17 @@ class GinqTest {
         '''
     }
 
+    @Test
+    void "testGinq - window - 49"() {
+        assertGinqScript '''
+            assert [[0, 0], [1, 1], [2, 2], [3, 3], [4, 4],
+                    [5, 5], [6, 6], [7, 7], [8, 8], [9, 9]] == GQ(parallel:true) {
+                from n in 0..<10
+                select n, (rowNumber() over(orderby n))
+            }.toList()
+        '''
+    }
+
     private static void assertGinqScript(String script) {
         String deoptimizedScript = script.replaceAll(/\bGQ\s*[{]/, 'GQ(optimize:false) {')
         List<String> scriptList = [deoptimizedScript, script]