You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/06 22:23:04 UTC

[GitHub] [spark] holdenk commented on a change in pull request #24559: [SPARK-27658][SQL] Add FunctionCatalog API

holdenk commented on a change in pull request #24559:
URL: https://github.com/apache/spark/pull/24559#discussion_r500624589



##########
File path: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/AggregateFunction.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * Interface for a function that produces a result value by aggregating over multiple input rows.
+ * <p>
+ * The JVM type of result values produced by this function must be the type used by Spark's
+ * InternalRow API for the {@link DataType SQL data type} returned by {@link #resultType()}.
+ *
+ * @param <S> the JVM type for the aggregation's intermediate state
+ * @param <R> the JVM type of result values
+ */
+public interface AggregateFunction<S, R> extends BoundFunction {
+
+  /**
+   * Initialize state for an aggregation.
+   * <p>
+   * This method is called one or more times for every group of values to initialize intermediate
+   * aggregation state. More than one intermediate aggregation state variable may be used when the
+   * aggregation is run in parallel tasks.
+   * <p>
+   * The object returned may passed to {@link #update(Object, InternalRow)},
+   * and {@link #produceResult(Object)}. Implementations that return null must support null state
+   * passed into all other methods.
+   *
+   * @return a state instance or null
+   */
+  S newAggregationState();
+
+  /**
+   * Update the aggregation state with a new row.
+   * <p>
+   * This is called for each row in a group to update an intermediate aggregation state.
+   *
+   * @param state intermediate aggregation state
+   * @param input an input row
+   * @return updated aggregation state
+   */
+  S update(S state, InternalRow input);

Review comment:
       Would it make sense to have a default implementation for taking an iterator of internalrows? Just thinking out loud.

##########
File path: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/AggregateFunction.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * Interface for a function that produces a result value by aggregating over multiple input rows.

Review comment:
       Just thinking back to our initial groupByKey impl, can we add a warning here that if folks do not implement the `AssociativeAggregateFunction` they are going to force all values for a key onto a single node.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org