You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/12 08:20:55 UTC

[GitHub] [flink] luoyuxia commented on a diff in pull request #21401: [FLINK-29718][table] Supports hive sum function by native implementation

luoyuxia commented on code in PR #21401:
URL: https://github.com/apache/flink/pull/21401#discussion_r1045472367


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for native hive agg function compatibility. */
+public class HiveDialectAggITCase extends HiveDialectITCaseBase {
+
+    @Test
+    public void testSumAggFunctionPlan() {
+        // test explain
+        String actualPlan = explainSql("select x, sum(y) from foo group by x");
+        assertThat(actualPlan).isEqualTo(readFromResource("/explain/testSumAggFunctionPlan.out"));
+    }
+
+    @Test
+    public void testSimpleSumAggFunction() throws Exception {
+        tableEnv.executeSql("create table test(x string, y string, z int, d decimal(10,5))");
+        tableEnv.executeSql(
+                        "insert into test values (NULL, '2', 1, 1.11), (NULL, NULL, 2, 2.22), (NULL, '4', 3, 3.33), (NULL, NULL, 4, 4.45)")
+                .await();
+
+        // test sum with all elements are null
+        List<Row> result =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select sum(x) from test").collect());
+        assertThat(result.toString()).isEqualTo("[+I[null]]");
+
+        // test sum string type with partial element is null, result type is double
+        List<Row> result2 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select sum(y) from test").collect());
+        assertThat(result2.toString()).isEqualTo("[+I[6.0]]");
+
+        // TODO test sum string with some string can't convert to bigint after FLINK-30221

Review Comment:
   Just found https://issues.apache.org/jira/browse/FLINK-30221 has been closed.
   If it's only the case for string, we can then fallback to Hive's own sum function which won't use agg sort, but it's fine for naive implementation will also use agg sort.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for native hive agg function compatibility. */
+public class HiveDialectAggITCase extends HiveDialectITCaseBase {
+
+    @Test
+    public void testSumAggFunctionPlan() {
+        // test explain
+        String actualPlan = explainSql("select x, sum(y) from foo group by x");
+        assertThat(actualPlan).isEqualTo(readFromResource("/explain/testSumAggFunctionPlan.out"));
+    }
+
+    @Test
+    public void testSimpleSumAggFunction() throws Exception {
+        tableEnv.executeSql("create table test(x string, y string, z int, d decimal(10,5))");
+        tableEnv.executeSql(
+                        "insert into test values (NULL, '2', 1, 1.11), (NULL, NULL, 2, 2.22), (NULL, '4', 3, 3.33), (NULL, NULL, 4, 4.45)")
+                .await();
+
+        // test sum with all elements are null
+        List<Row> result =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select sum(x) from test").collect());
+        assertThat(result.toString()).isEqualTo("[+I[null]]");
+
+        // test sum string type with partial element is null, result type is double
+        List<Row> result2 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select sum(y) from test").collect());
+        assertThat(result2.toString()).isEqualTo("[+I[6.0]]");
+
+        // TODO test sum string with some string can't convert to bigint after FLINK-30221
+
+        // test decimal type
+        List<Row> result3 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select sum(d) from test").collect());
+        assertThat(result3.toString()).isEqualTo("[+I[11.11000]]");
+
+        // test sum int, result type is bigint
+        List<Row> result4 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select sum(z) from test").collect());
+        assertThat(result4.toString()).isEqualTo("[+I[10]]");
+
+        // test sum string&int type simultaneously
+        List<Row> result5 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select sum(y), sum(z) from test").collect());
+        assertThat(result5.toString()).isEqualTo("[+I[6.0, 10]]");
+
+        tableEnv.executeSql("drop table test");

Review Comment:
   Please also add a test for datatype float/doble/timestamp.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for native hive agg function compatibility. */
+public class HiveDialectAggITCase extends HiveDialectITCaseBase {
+
+    @Test
+    public void testSumAggFunctionPlan() {
+        // test explain
+        String actualPlan = explainSql("select x, sum(y) from foo group by x");
+        assertThat(actualPlan).isEqualTo(readFromResource("/explain/testSumAggFunctionPlan.out"));
+    }
+
+    @Test
+    public void testSimpleSumAggFunction() throws Exception {

Review Comment:
   Is the following tests has been verified in Hive by yourself to make sure it's hive compatibility?



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for native hive agg function compatibility. */
+public class HiveDialectAggITCase extends HiveDialectITCaseBase {

Review Comment:
   I don't think we need to extend `HiveDialectITCaseBase`, it's fine to add some duplicated code in here, which I think may not too many. 
   Otherwise, `HiveDialectAggITCase` will create unless tables.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeMerging;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.aggDecimalPlus;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.cast;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+
+/** built-in hive sum aggregate function. */
+public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction {
+
+    private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+    private DataType resultType;
+
+    @Override
+    public int operandCount() {
+        return 1;
+    }
+
+    @Override
+    public UnresolvedReferenceExpression[] aggBufferAttributes() {
+        return new UnresolvedReferenceExpression[] {sum};
+    }
+
+    @Override
+    public DataType[] getAggBufferTypes() {
+        return new DataType[] {getResultType()};
+    }
+
+    @Override
+    public DataType getResultType() {
+        return resultType;
+    }
+
+    @Override
+    public Expression[] initialValuesExpressions() {
+        return new Expression[] {/* sum = */ nullOf(getResultType())};
+    }
+
+    @Override
+    public Expression[] accumulateExpressions() {
+        return new Expression[] {
+            /* sum = */ ifThenElse(
+                    isNull(operand(0)),
+                    sum,
+                    ifThenElse(
+                            isNull(sum),
+                            tryCast(operand(0), typeLiteral(getResultType())),

Review Comment:
   FYI: FLINK-30221  has been closed, if it's by design and won't be fixed, we should return 0 instead null explicitly in here.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeMerging;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.aggDecimalPlus;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.cast;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.tryCast;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+
+/** built-in hive sum aggregate function. */
+public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction {
+
+    private final UnresolvedReferenceExpression sum = unresolvedRef("sum");
+    private DataType resultType;
+
+    @Override
+    public int operandCount() {
+        return 1;
+    }
+
+    @Override
+    public UnresolvedReferenceExpression[] aggBufferAttributes() {
+        return new UnresolvedReferenceExpression[] {sum};
+    }
+
+    @Override
+    public DataType[] getAggBufferTypes() {
+        return new DataType[] {getResultType()};
+    }
+
+    @Override
+    public DataType getResultType() {
+        return resultType;
+    }
+
+    @Override
+    public Expression[] initialValuesExpressions() {
+        return new Expression[] {/* sum = */ nullOf(getResultType())};
+    }
+
+    @Override
+    public Expression[] accumulateExpressions() {
+        return new Expression[] {
+            /* sum = */ ifThenElse(
+                    isNull(operand(0)),
+                    sum,
+                    ifThenElse(
+                            isNull(sum),
+                            tryCast(operand(0), typeLiteral(getResultType())),
+                            adjustedPlus(sum, tryCast(operand(0), typeLiteral(getResultType())))))
+        };
+    }
+
+    @Override
+    public Expression[] retractExpressions() {
+        throw new TableException("Sum aggregate function does not support retraction.");
+    }
+
+    @Override
+    public Expression[] mergeExpressions() {
+        return new Expression[] {
+            /* sum = */ ifThenElse(
+                    isNull(mergeOperand(sum)),
+                    sum,
+                    ifThenElse(
+                            isNull(sum), mergeOperand(sum), adjustedPlus(sum, mergeOperand(sum))))
+        };
+    }
+
+    @Override
+    public Expression getValueExpression() {
+        return sum;
+    }
+
+    @Override
+    public void setArguments(CallContext callContext) {
+        if (resultType == null) {
+            resultType = initResultType(callContext.getArgumentDataTypes().get(0));
+        }
+    }
+
+    private DataType initResultType(DataType argsType) {
+        switch (argsType.getLogicalType().getTypeRoot()) {
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+                return DataTypes.BIGINT();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case FLOAT:
+            case DOUBLE:
+            case CHAR:
+            case VARCHAR:
+                return DataTypes.DOUBLE();
+            case DECIMAL:
+                DecimalType sumType =
+                        (DecimalType) LogicalTypeMerging.findSumAggType(argsType.getLogicalType());

Review Comment:
   Why use `LogicalTypeMerging.findSumAggType`?
   Is it equal to the [Hive's logic](https://github.com/apache/hive/blob/10805bc997d7cd136b85fca9200cf165ffe2eae5/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java#L235) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org