You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by GitBox <gi...@apache.org> on 2020/06/18 18:11:14 UTC

[GitHub] [calcite] amaliujia opened a new pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

amaliujia opened a new pull request #2035:
URL: https://github.com/apache/calcite/pull/2035


   …e (Rui Wang).
   
   
   
   See: https://jira.apache.org/jira/browse/CALCITE-4008


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r443406186



##########
File path: core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregate.java
##########
@@ -90,6 +101,133 @@ public EnumerableSortedAggregate(
   }
 
   public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
-    throw Util.needToImplement("EnumerableSortedAggregate");
+    if (getGroupType() != Group.SIMPLE

Review comment:
       Would it be possible to extract these conditions as an auxiliary method `isSupported` ?
   By doing this, we could make EnumerableSortedAggregateRule to check it, and in case of not supported, do not generate the EnumerableSortedAggregate (otherwise the rule would generate an operator that will fail when trying to implement).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r443451500



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {

Review comment:
       I think this implementation of `current` does not respect this part of the contract:
   ```
   This method does not move the position of the enumerator, and
   consecutive calls to {@code current} return the same object until either
   {@code moveNext} or {@code reset} is called.
   ```
   I have the impression that consecutive calls to `current` will return different results. I think the logic that we have here should be transferred as part of `moveNext`, and the "current object" should be somehow saved; finally `current` should do no processing, just returning the "current object" saved by the `moveNext` process, or something like that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445351713



##########
File path: core/src/test/java/org/apache/calcite/test/enumerable/EnumerableSortedAggregateTest.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+
+public class EnumerableSortedAggregateTest {
+  @Test void sortedAgg() {

Review comment:
       I would propose e.g.
   - group by X, Y (several columns)
   - group by X + order by X
   - group by X + order by Y




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445733718



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+      TResult result = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);

Review comment:
       added this requirement to both javadoc of `sortedGroupBy` and relevant lines in `EnumerableSortedAggregate`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445733828



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,132 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private boolean isLastMoveNextFalse;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+    private TResult curResult;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+      curResult = null;
+      isLastMoveNextFalse = false;
+    }
+
+    @Override public TResult current() {
+      if (isLastMoveNextFalse) {
+        throw new NoSuchElementException();
+      }
+      return curResult;
+    }
+
+    @Override public boolean moveNext() {
+      if (!isInitialized) {
+        isInitialized = true;
+        // input is empty
+        if (!enumerator.moveNext()) {
+          isLastMoveNextFalse = true;
+          return false;
+        }
+      } else if (isInitialized && curAccumulator == null) {
+        // input has been exhausted.
+        isLastMoveNextFalse = true;
+        return false;
+      }
+
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+
+      // reset result because now it can move to next aggregated result.
+      curResult = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);
+        if (comparator.compare(prevKey, curKey) != 0) {
+          // current key is different from previous key, get accumulated results and re-create
+          // accumulator for current key.
+          curResult = resultSelector.apply(prevKey, curAccumulator);
+          curAccumulator = accumulatorInitializer.apply();
+          break;
+        } else {

Review comment:
       Yes! Removed `else`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r442417140



##########
File path: core/src/test/java/org/apache/calcite/test/enumerable/EnumerableSortedAggregateTest.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+
+public class EnumerableSortedAggregateTest {
+  @Test void sortedAgg() {

Review comment:
       R @hsyuan any suggestion that what extra tests are useful for `EnumerableSortedAggregate`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r443486637



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+      TResult result = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);
+        if (comparator.compare(prevKey, curKey) != 0) {
+          // current key is different from previous key, get accumulated results and re-create
+          // accumulator for current key.
+          result = resultSelector.apply(prevKey, curAccumulator);
+          curAccumulator = accumulatorInitializer.apply();
+          break;
+        } else {
+          curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+        }
+        prevKey = curKey;
+      }
+
+      if (result == null) {
+        // current key is the last key.
+        result = resultSelector.apply(prevKey, curAccumulator);
+        // no need to keep accumulator for the last key.
+        curAccumulator = null;
+      }
+
+      return result;
+    }
+
+    @Override public boolean moveNext() {
+      if (!isInitialized) {
+        isInitialized = true;
+        return enumerator.moveNext();
+      } else {
+        return curAccumulator != null;
+      }
+    }
+
+    @Override public void reset() {
+      enumerator.reset();

Review comment:
       Maybe we should put `curAccumulator = null;` as part of the `reset`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] hsyuan commented on pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
hsyuan commented on pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#issuecomment-650560583


   @rubenada Thanks for your help.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445352328



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,132 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private boolean isLastMoveNextFalse;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+    private TResult curResult;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+      curResult = null;
+      isLastMoveNextFalse = false;
+    }
+
+    @Override public TResult current() {
+      if (isLastMoveNextFalse) {
+        throw new NoSuchElementException();
+      }
+      return curResult;
+    }
+
+    @Override public boolean moveNext() {
+      if (!isInitialized) {
+        isInitialized = true;
+        // input is empty
+        if (!enumerator.moveNext()) {
+          isLastMoveNextFalse = true;
+          return false;
+        }
+      } else if (isInitialized && curAccumulator == null) {
+        // input has been exhausted.
+        isLastMoveNextFalse = true;
+        return false;
+      }
+
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+
+      // reset result because now it can move to next aggregated result.
+      curResult = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);
+        if (comparator.compare(prevKey, curKey) != 0) {
+          // current key is different from previous key, get accumulated results and re-create
+          // accumulator for current key.
+          curResult = resultSelector.apply(prevKey, curAccumulator);
+          curAccumulator = accumulatorInitializer.apply();
+          break;
+        } else {

Review comment:
       I think we can remove the `else`, because there is a `break` at the end of the `if` block. IMO that would meake the code clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445353311



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,132 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private boolean isLastMoveNextFalse;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+    private TResult curResult;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+      curResult = null;
+      isLastMoveNextFalse = false;
+    }
+
+    @Override public TResult current() {
+      if (isLastMoveNextFalse) {
+        throw new NoSuchElementException();
+      }
+      return curResult;
+    }
+
+    @Override public boolean moveNext() {
+      if (!isInitialized) {
+        isInitialized = true;
+        // input is empty
+        if (!enumerator.moveNext()) {
+          isLastMoveNextFalse = true;
+          return false;
+        }
+      } else if (isInitialized && curAccumulator == null) {

Review comment:
       I think checking `isInitialized` in this line is redundant. At this point `isInitialized` will always be true (otherwise we would have went into the first `if` block).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445734744



##########
File path: core/src/test/java/org/apache/calcite/test/enumerable/EnumerableSortedAggregateTest.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+
+public class EnumerableSortedAggregateTest {
+  @Test void sortedAgg() {

Review comment:
       Missed this comment. Will add more tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r444744863



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {

Review comment:
       Also a minor detail, please notice that, per contract, `current` must throw `NoSuchElementException` if `moveNext` has not been called, in your case I think it would just return `null`.
   You can easily fix this by using a DUMMY object, see e.g. `lazyCollectionSpool` method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#issuecomment-650038139


   Thanks for your work @amaliujia !
   I think the PR looks good (just need to squash commits).
   @hsyuan do you want to take a final look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r443446065



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+      TResult result = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);

Review comment:
       Would it be possible for a key to be `null`? Do we need to add a check for that?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445733718



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+      TResult result = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);

Review comment:
       added this requirements to both javadoc of `sortedGroupBy` and relevant lines in `EnumerableSortedAggregate`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445784153



##########
File path: core/src/test/java/org/apache/calcite/test/enumerable/EnumerableSortedAggregateTest.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+
+public class EnumerableSortedAggregateTest {
+  @Test void sortedAgg() {

Review comment:
       Thanks for your suggestion. I added a unit test for each of the suggested item.

##########
File path: core/src/test/java/org/apache/calcite/test/enumerable/EnumerableSortedAggregateTest.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+
+public class EnumerableSortedAggregateTest {
+  @Test void sortedAgg() {

Review comment:
       Thanks for your suggestion. I added a unit test for each of the suggested items.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r443406186



##########
File path: core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregate.java
##########
@@ -90,6 +101,133 @@ public EnumerableSortedAggregate(
   }
 
   public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
-    throw Util.needToImplement("EnumerableSortedAggregate");
+    if (getGroupType() != Group.SIMPLE

Review comment:
       Would it be possible to extract these conditions as an auxiliary method `isSupported` ?
   By doing this, we could make EnumerableSortedAggregateRule to check it, and it case of not supported, do not generate the EnumerableSortedAggregate (otherwise the rule would generate an operator that will fail when trying to implement).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r442417140



##########
File path: core/src/test/java/org/apache/calcite/test/enumerable/EnumerableSortedAggregateTest.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+
+public class EnumerableSortedAggregateTest {
+  @Test void sortedAgg() {

Review comment:
       @hsyuan any suggestion that what extra tests are useful for `EnumerableSortedAggregate`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#issuecomment-650995914


   @rubenada 
   I think this PR is ready to merge. I have rebased and squashed commits!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#issuecomment-646697215


   Sure, @hsyuan , I'll take a look


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445734105



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,132 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private boolean isLastMoveNextFalse;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+    private TResult curResult;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+      curResult = null;
+      isLastMoveNextFalse = false;
+    }
+
+    @Override public TResult current() {
+      if (isLastMoveNextFalse) {
+        throw new NoSuchElementException();
+      }
+      return curResult;
+    }
+
+    @Override public boolean moveNext() {
+      if (!isInitialized) {
+        isInitialized = true;
+        // input is empty
+        if (!enumerator.moveNext()) {
+          isLastMoveNextFalse = true;
+          return false;
+        }
+      } else if (isInitialized && curAccumulator == null) {

Review comment:
       Yes! Removed `isInitialized`. It indeed should be true after the first call of `moveNext`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r442416752



##########
File path: core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregate.java
##########
@@ -90,6 +101,133 @@ public EnumerableSortedAggregate(
   }
 
   public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
-    throw Util.needToImplement("EnumerableSortedAggregate");
+    if (getGroupType() != Group.SIMPLE
+        || aggCalls.isEmpty()) {
+      throw Util.needToImplement("EnumerableSortedAggregate");
+    }
+
+    final JavaTypeFactory typeFactory = implementor.getTypeFactory();
+    final BlockBuilder builder = new BlockBuilder();
+    final EnumerableRel child = (EnumerableRel) getInput();
+    final Result result = implementor.visitChild(this, 0, child, pref);
+    Expression childExp =
+        builder.append(
+            "child",
+            result.block);
+
+    final PhysType physType =
+        PhysTypeImpl.of(
+            typeFactory, getRowType(), pref.preferCustom());
+
+    final PhysType inputPhysType = result.physType;
+
+    ParameterExpression parameter =
+        Expressions.parameter(inputPhysType.getJavaRowType(), "a0");
+
+    final PhysType keyPhysType =
+        inputPhysType.project(groupSet.asList(), getGroupType() != Group.SIMPLE,
+            JavaRowFormat.LIST);
+    final int groupCount = getGroupCount();
+
+    final List<AggImpState> aggs = new ArrayList<>(aggCalls.size());
+    for (Ord<AggregateCall> call : Ord.zip(aggCalls)) {
+      aggs.add(new AggImpState(call.i, call.e, false));
+    }
+
+    // Function0<Object[]> accumulatorInitializer =
+    //     new Function0<Object[]>() {
+    //         public Object[] apply() {
+    //             return new Object[] {0, 0};
+    //         }
+    //     };
+    final List<Expression> initExpressions = new ArrayList<>();
+    final BlockBuilder initBlock = new BlockBuilder();
+
+    final List<Type> aggStateTypes = createAggStateTypes(
+        initExpressions, initBlock, aggs, typeFactory);
+
+    final PhysType accPhysType =
+        PhysTypeImpl.of(typeFactory,
+            typeFactory.createSyntheticType(aggStateTypes));
+
+    declareParentAccumulator(initExpressions, initBlock, accPhysType);
+
+    final Expression accumulatorInitializer =
+        builder.append("accumulatorInitializer",
+            Expressions.lambda(
+                Function0.class,
+                initBlock.toBlock()));
+
+    // Function2<Object[], Employee, Object[]> accumulatorAdder =
+    //     new Function2<Object[], Employee, Object[]>() {
+    //         public Object[] apply(Object[] acc, Employee in) {
+    //              acc[0] = ((Integer) acc[0]) + 1;
+    //              acc[1] = ((Integer) acc[1]) + in.salary;
+    //             return acc;
+    //         }
+    //     };
+    final ParameterExpression inParameter =
+        Expressions.parameter(inputPhysType.getJavaRowType(), "in");
+    final ParameterExpression acc_ =
+        Expressions.parameter(accPhysType.getJavaRowType(), "acc");
+
+    createAccumulatorAdders(
+        inParameter, aggs, accPhysType, acc_, inputPhysType, builder, implementor, typeFactory);
+
+    final ParameterExpression lambdaFactory =
+        Expressions.parameter(AggregateLambdaFactory.class,
+            builder.newName("lambdaFactory"));
+
+    implementLambdaFactory(builder, inputPhysType, aggs, accumulatorInitializer,
+        false, lambdaFactory);
+
+    final BlockBuilder resultBlock = new BlockBuilder();
+    final List<Expression> results = Expressions.list();
+    final ParameterExpression key_;
+    final Type keyType = keyPhysType.getJavaRowType();
+    key_ = Expressions.parameter(keyType, "key");
+    for (int j = 0; j < groupCount; j++) {
+      final Expression ref = keyPhysType.fieldReference(key_, j);
+      results.add(ref);
+    }
+
+    for (final AggImpState agg : aggs) {
+      results.add(
+          agg.implementor.implementResult(agg.context,
+              new AggResultContextImpl(resultBlock, agg.call, agg.state, key_,
+                  keyPhysType)));
+    }
+    resultBlock.add(physType.record(results));
+
+    final Expression keySelector_ =
+        builder.append("keySelector",
+            inputPhysType.generateSelector(parameter,
+                groupSet.asList(),
+                keyPhysType.getFormat()));
+    // Generate the appropriate key Comparator.
+    final Expression comparator = keyPhysType.generateComparator(getTraitSet().getCollation());

Review comment:
       I will need to double check and probably revise this part to make KeySelector match with KeyComparator.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r444658637



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+      TResult result = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);

Review comment:
       This is a good question. 
   
   If we allow `null`, then comparator should give a consistent ordering for null (e.g. https://stackoverflow.com/a/2401629/10055573)
   
   If we don't allow `null`, indeed we should have a check somewhere.
   
   However, the problem is I don't know if all those comparator give a consistent ordering for null. 
   
   Also 
   ```
           // mergeJoin assumes inputs sorted in ascending order with nulls last,
           // if we reach a null key, we are done.
   ```
   
   I actually not sure what should be the right resolution. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#issuecomment-651017132


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada merged pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada merged pull request #2035:
URL: https://github.com/apache/calcite/pull/2035


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r446320901



##########
File path: core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregate.java
##########
@@ -90,6 +101,133 @@ public EnumerableSortedAggregate(
   }
 
   public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
-    throw Util.needToImplement("EnumerableSortedAggregate");
+    if (getGroupType() != Group.SIMPLE
+        || aggCalls.isEmpty()) {
+      throw Util.needToImplement("EnumerableSortedAggregate");
+    }
+
+    final JavaTypeFactory typeFactory = implementor.getTypeFactory();
+    final BlockBuilder builder = new BlockBuilder();
+    final EnumerableRel child = (EnumerableRel) getInput();
+    final Result result = implementor.visitChild(this, 0, child, pref);
+    Expression childExp =
+        builder.append(
+            "child",
+            result.block);
+
+    final PhysType physType =
+        PhysTypeImpl.of(
+            typeFactory, getRowType(), pref.preferCustom());
+
+    final PhysType inputPhysType = result.physType;
+
+    ParameterExpression parameter =
+        Expressions.parameter(inputPhysType.getJavaRowType(), "a0");
+
+    final PhysType keyPhysType =
+        inputPhysType.project(groupSet.asList(), getGroupType() != Group.SIMPLE,
+            JavaRowFormat.LIST);
+    final int groupCount = getGroupCount();
+
+    final List<AggImpState> aggs = new ArrayList<>(aggCalls.size());
+    for (Ord<AggregateCall> call : Ord.zip(aggCalls)) {
+      aggs.add(new AggImpState(call.i, call.e, false));
+    }
+
+    // Function0<Object[]> accumulatorInitializer =
+    //     new Function0<Object[]>() {
+    //         public Object[] apply() {
+    //             return new Object[] {0, 0};
+    //         }
+    //     };
+    final List<Expression> initExpressions = new ArrayList<>();
+    final BlockBuilder initBlock = new BlockBuilder();
+
+    final List<Type> aggStateTypes = createAggStateTypes(
+        initExpressions, initBlock, aggs, typeFactory);
+
+    final PhysType accPhysType =
+        PhysTypeImpl.of(typeFactory,
+            typeFactory.createSyntheticType(aggStateTypes));
+
+    declareParentAccumulator(initExpressions, initBlock, accPhysType);
+
+    final Expression accumulatorInitializer =
+        builder.append("accumulatorInitializer",
+            Expressions.lambda(
+                Function0.class,
+                initBlock.toBlock()));
+
+    // Function2<Object[], Employee, Object[]> accumulatorAdder =
+    //     new Function2<Object[], Employee, Object[]>() {
+    //         public Object[] apply(Object[] acc, Employee in) {
+    //              acc[0] = ((Integer) acc[0]) + 1;
+    //              acc[1] = ((Integer) acc[1]) + in.salary;
+    //             return acc;
+    //         }
+    //     };
+    final ParameterExpression inParameter =
+        Expressions.parameter(inputPhysType.getJavaRowType(), "in");
+    final ParameterExpression acc_ =
+        Expressions.parameter(accPhysType.getJavaRowType(), "acc");
+
+    createAccumulatorAdders(
+        inParameter, aggs, accPhysType, acc_, inputPhysType, builder, implementor, typeFactory);
+
+    final ParameterExpression lambdaFactory =
+        Expressions.parameter(AggregateLambdaFactory.class,
+            builder.newName("lambdaFactory"));
+
+    implementLambdaFactory(builder, inputPhysType, aggs, accumulatorInitializer,
+        false, lambdaFactory);
+
+    final BlockBuilder resultBlock = new BlockBuilder();
+    final List<Expression> results = Expressions.list();
+    final ParameterExpression key_;
+    final Type keyType = keyPhysType.getJavaRowType();
+    key_ = Expressions.parameter(keyType, "key");
+    for (int j = 0; j < groupCount; j++) {
+      final Expression ref = keyPhysType.fieldReference(key_, j);
+      results.add(ref);
+    }
+
+    for (final AggImpState agg : aggs) {
+      results.add(
+          agg.implementor.implementResult(agg.context,
+              new AggResultContextImpl(resultBlock, agg.call, agg.state, key_,
+                  keyPhysType)));
+    }
+    resultBlock.add(physType.record(results));
+
+    final Expression keySelector_ =
+        builder.append("keySelector",
+            inputPhysType.generateSelector(parameter,
+                groupSet.asList(),
+                keyPhysType.getFormat()));
+    // Generate the appropriate key Comparator.
+    final Expression comparator = keyPhysType.generateComparator(getTraitSet().getCollation());

Review comment:
       In fact. I cannot find a way to turn on top down opt by Hook.PLANNER




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445347641



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {

Review comment:
       You're right. Re-reading the javadoc of `current` and its `throws` clauses, I find it a bit confusing, even contradictory, regarding this type of situations. In any case, this is out of the scope of the current PR, so IMO your `current` proposal is ok.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#issuecomment-646846325


   Thank you!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445212926



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {

Review comment:
       Thanks for the example!
   
   I doubled check the contract and have the following conclusion:
   ```
      * <p>After an enumerator is created or after the {@link #reset} method is
      * called, the {@link #moveNext} method must be called to advance the
      * enumerator to the first element of the collection before reading the
      * value of the {@code current} property; otherwise, {@code current} is
      * undefined.
   ```
   So in this case, return `null` is right as the result is undefined.
   
   ```
      * <p>This method also throws {@link java.util.NoSuchElementException} if
      * the last call to {@code moveNext} returned {@code false}, which indicates
      * the end of the collection.
   ```
   I updated the code to make sure it throws `NoSuchElementException` in this case.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] hsyuan commented on pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
hsyuan commented on pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#issuecomment-646695520


   @rubenada Can you help review this pull request?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445213183



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+      TResult result = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);

Review comment:
       Added a test to test null values. Based on the test, null is put at the last position (ASC) by comparator.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r444657447



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {

Review comment:
       This is a great point. Indeed `current` should return same object unless it moves to next or resets.
   
   I move a bunch of code to `moveNext` and only keep return a cached current result in `current` now. 

##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+      TResult result = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);
+        if (comparator.compare(prevKey, curKey) != 0) {
+          // current key is different from previous key, get accumulated results and re-create
+          // accumulator for current key.
+          result = resultSelector.apply(prevKey, curAccumulator);
+          curAccumulator = accumulatorInitializer.apply();
+          break;
+        } else {
+          curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+        }
+        prevKey = curKey;
+      }
+
+      if (result == null) {
+        // current key is the last key.
+        result = resultSelector.apply(prevKey, curAccumulator);
+        // no need to keep accumulator for the last key.
+        curAccumulator = null;
+      }
+
+      return result;
+    }
+
+    @Override public boolean moveNext() {
+      if (!isInitialized) {
+        isInitialized = true;
+        return enumerator.moveNext();
+      } else {
+        return curAccumulator != null;
+      }
+    }
+
+    @Override public void reset() {
+      enumerator.reset();

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#issuecomment-650978346


   Thanks @hsyuan .
   @amaliujia do you think there is any other issue to be addressed? Could we consider squash commits and merge?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] hsyuan edited a comment on pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
hsyuan edited a comment on pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#issuecomment-650560583


   @rubenada Thanks for your help.
   As long as you approved on it, we are good to go.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445353311



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,132 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private boolean isLastMoveNextFalse;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+    private TResult curResult;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+      curResult = null;
+      isLastMoveNextFalse = false;
+    }
+
+    @Override public TResult current() {
+      if (isLastMoveNextFalse) {
+        throw new NoSuchElementException();
+      }
+      return curResult;
+    }
+
+    @Override public boolean moveNext() {
+      if (!isInitialized) {
+        isInitialized = true;
+        // input is empty
+        if (!enumerator.moveNext()) {
+          isLastMoveNextFalse = true;
+          return false;
+        }
+      } else if (isInitialized && curAccumulator == null) {

Review comment:
       I think checking `isInitialized` in this line is redundant. At this point `isInitialized` will always be true (otherwise we would have went into the first `if` block.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r444738980



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+      TResult result = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);

Review comment:
       NULLs should always come at the beginning or at the end, so I assume the current algorithm would work... if the comparator is able to support nulls. Otherwise, as you say, we would need to add an extra check for null values.
   I guess this will be an good scenario for a unit test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] rubenada commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445349359



##########
File path: linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+      TResult result = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);

Review comment:
       Thanks for adding test, good example.
   Maybe it might be worth it adding a line in `sortedGroupBy` javadocs specifying that _in case of null keys, the comparator must be able to support null values_ (or something along these lines).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r444658998



##########
File path: core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregate.java
##########
@@ -90,6 +101,133 @@ public EnumerableSortedAggregate(
   }
 
   public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
-    throw Util.needToImplement("EnumerableSortedAggregate");
+    if (getGroupType() != Group.SIMPLE

Review comment:
       That'a good point. In fact, I think I should follow https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregateRule.java#L41




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org