You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@groovy.apache.org by su...@apache.org on 2020/12/31 04:26:38 UTC
[groovy] branch master updated: Ensure the row number is generated
sequentially
This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git
The following commit(s) were added to refs/heads/master by this push:
new cf5c86b Ensure the row number is generated sequentially
cf5c86b is described below
commit cf5c86b8e07a54bc85c37a859860a743928c469e
Author: Daniel Sun <su...@apache.org>
AuthorDate: Thu Dec 31 12:26:14 2020 +0800
Ensure the row number is generated sequentially
---
.../collection/runtime/QueryableCollection.java | 43 ++++++++++++++--------
.../collection/runtime/QueryableHelper.groovy | 10 ++---
.../test/org/apache/groovy/ginq/GinqTest.groovy | 11 ++++++
3 files changed, 44 insertions(+), 20 deletions(-)
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
index 60803cf..ade24b5 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
@@ -265,12 +265,21 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
@Override
public <U> Queryable<U> select(BiFunction<? super T, ? super Queryable<? extends T>, ? extends U> mapper) {
- if (TRUE_STR.equals(QueryableHelper.getVar(USE_WINDOW_FUNCTION))) {
+ String originalParallel = null;
+ boolean useWindowFunction = TRUE_STR.equals(QueryableHelper.getVar(USE_WINDOW_FUNCTION));
+
+ if (useWindowFunction) {
+ originalParallel = QueryableHelper.getVar(PARALLEL);
+ QueryableHelper.setVar(PARALLEL, FALSE_STR); // ensure the row number is generated sequentially
this.makeReusable();
}
Stream<U> stream = this.stream().map((T t) -> mapper.apply(t, this));
+ if (useWindowFunction) {
+ QueryableHelper.setVar(PARALLEL, originalParallel);
+ }
+
return from(stream);
}
@@ -501,20 +510,23 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
public <U extends Comparable<? super U>> Window<T> over(Tuple2<T, Long> currentRecord, WindowDefinition<T, U> windowDefinition) {
this.makeReusable();
Queryable<Tuple2<T, Long>> partition =
- from(Collections.singletonList(currentRecord)).innerHashJoin(partitionCache.computeIfAbsent(windowDefinition, wd -> {
- long[] rn = new long[] { 1L };
- List<Tuple2<T, Long>> listWithIndex =
- this.toList().stream()
- .map(e -> Tuple.tuple(e, rn[0]++))
- .collect(Collectors.toList());
-
- final Queryable<Tuple2<?, Queryable<Tuple2<T, Long>>>> q = from(listWithIndex).groupBy(wd.partitionBy().compose(Tuple2::getV1));
- if (q instanceof QueryableCollection) {
- ((QueryableCollection) q).makeReusable();
- }
- return q;
- }), a -> windowDefinition.partitionBy().apply(a.getV1()), Tuple2::getV1)
- .select((e, q) -> e.getV2().getV2())
+ from(Collections.singletonList(currentRecord)).innerHashJoin(
+ partitionCache.computeIfAbsent(windowDefinition, wd -> {
+ long[] rn = new long[] { 1L };
+ List<Tuple2<T, Long>> listWithIndex =
+ this.toList().stream()
+ .map(e -> Tuple.tuple(e, rn[0]++))
+ .collect(Collectors.toList());
+
+ final Queryable<Tuple2<?, Queryable<Tuple2<T, Long>>>> q =
+ from(listWithIndex)
+ .groupBy(wd.partitionBy().compose(Tuple2::getV1));
+ if (q instanceof QueryableCollection) {
+ ((QueryableCollection) q).makeReusable();
+ }
+ return q;
+ }), a -> windowDefinition.partitionBy().apply(a.getV1()), Tuple2::getV1
+ ).select((e, q) -> e.getV2().getV2())
.stream()
.findFirst()
.orElse(Queryable.emptyQueryable());
@@ -604,5 +616,6 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
private static final String USE_WINDOW_FUNCTION = "useWindowFunction";
private static final String PARALLEL = "parallel";
private static final String TRUE_STR = "true";
+ private static final String FALSE_STR = "false";
private static final long serialVersionUID = -5067092453136522893L;
}
diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
index a91c95c..179d026 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
@@ -70,16 +70,16 @@ class QueryableHelper {
throw new TooManyValuesException("subquery returns more than one value: $list")
}
- static void setVar(String name, Object value) {
+ static <T> void setVar(String name, T value) {
VAR_HOLDER.get().put(name, value)
}
- static Object getVar(String name) {
- VAR_HOLDER.get().get(name)
+ static <T> T getVar(String name) {
+ (T) VAR_HOLDER.get().get(name)
}
- static Object removeVar(String name) {
- VAR_HOLDER.get().remove(name)
+ static <T> T removeVar(String name) {
+ (T) VAR_HOLDER.get().remove(name)
}
private static final ThreadLocal<Map<String, Object>> VAR_HOLDER = ThreadLocal.<Map<String, Object>> withInitial(() -> new LinkedHashMap<>())
diff --git a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
index 1a0b668..ba2fe62 100644
--- a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
+++ b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
@@ -5223,6 +5223,17 @@ class GinqTest {
'''
}
+ @Test
+ void "testGinq - window - 49"() {
+ assertGinqScript '''
+ assert [[0, 0], [1, 1], [2, 2], [3, 3], [4, 4],
+ [5, 5], [6, 6], [7, 7], [8, 8], [9, 9]] == GQ(parallel:true) {
+ from n in 0..<10
+ select n, (rowNumber() over(orderby n))
+ }.toList()
+ '''
+ }
+
private static void assertGinqScript(String script) {
String deoptimizedScript = script.replaceAll(/\bGQ\s*[{]/, 'GQ(optimize:false) {')
List<String> scriptList = [deoptimizedScript, script]