You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:42:17 UTC

[GitHub] [flink] aljoscha commented on a change in pull request #13007: [FLINK-15803][table] Update AggregateFunction and TableAggregateFunction to the new type system

aljoscha commented on a change in pull request #13007:
URL: https://github.com/apache/flink/pull/13007#discussion_r461628431



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
##########
@@ -357,12 +358,7 @@ else if (inputType instanceof CompositeType) {
 
 		// atomic in any case
 		if (fieldNames == null) {
-			int i = 0;
-			String fieldName = ATOMIC_FIELD_NAME;
-			while ((null != existingNames) && existingNames.contains(fieldName)) {
-				fieldName = ATOMIC_FIELD_NAME + "_" + i++;
-			}
-			fieldNames = Collections.singletonList(fieldName);
+			fieldNames = Collections.singletonList(getAtomicName(existingNames));

Review comment:
       Nice catch! 😃

##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
##########
@@ -170,7 +170,13 @@ static StreamTableEnvironment create(StreamExecutionEnvironment executionEnviron
 	 * @param aggregateFunction The AggregateFunction to register.
 	 * @param <T> The type of the output value.
 	 * @param <ACC> The type of aggregate accumulator.
+	 *
+	 * @deprecated Use {@link #createTemporarySystemFunction(String, UserDefinedFunction)} instead. Please

Review comment:
       I like the thorough Javadoc here!

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
##########
@@ -19,39 +19,65 @@
 package org.apache.flink.table.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
 import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.types.extraction.TypeInferenceExtractor;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.util.Collector;
 
 /**
- * Base class for user-defined table aggregates.
- *
- * <p>The behavior of a {@link TableAggregateFunction} can be defined by implementing a series of
- * custom methods. A {@link TableAggregateFunction} needs at least three methods:
+ * Base class for a user-defined table aggregate function. A user-defined table aggregate function maps scalar
+ * values of multiple rows to zero, one, or multiple rows. If an output row consists of only one field,
+ * the row can be omitted and a scalar value can be emitted. It will be wrapped into an implicit row
+ * by the runtime.
+ *
+ * <p>Similar to an {@link AggregateFunction}, the behavior of an {@link TableAggregateFunction} is centered
+ * around the concept of an accumulator. The accumulator is an intermediate data structure that stores
+ * the aggregated values until a final aggregation result is computed.
+ *
+ * <p>For each set of rows that needs to be aggregated, the runtime will create an empty accumulator
+ * by calling the {@link #createAccumulator()}. Subsequently, the {@code accumulate()} method of the

Review comment:
       ```suggestion
    * by calling {@link #createAccumulator()}. Subsequently, the {@code accumulate()} method of the
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
##########
@@ -164,6 +168,33 @@ public static TableSchema expandCompositeTypeToSchema(DataType dataType) {
 		return tableSchema.getFieldDataType(name);
 	}
 
+	/**
+	 * Returns the data types of the flat representation of the given data type.
+	 */
+	public static List<DataType> flattenToDataTypes(DataType dataType) {
+		final LogicalType type = dataType.getLogicalType();
+		if (hasRoot(type, LogicalTypeRoot.DISTINCT_TYPE)) {
+			return flattenToDataTypes(dataType.getChildren().get(0));
+		} else if (isCompositeType(type)) {
+			return dataType.getChildren();
+		}
+		return Collections.singletonList(dataType);
+	}
+
+	/**
+	 * Returns the names of the flat representation of the given data type.
+	 */
+	public static List<String> flattenToNames(DataType dataType, List<String> existingNames) {
+		final LogicalType type = dataType.getLogicalType();

Review comment:
       nitpick: but this could probably reuse `flattenToDataType()` from above for the actual traversal and then just convert to names.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
##########
@@ -206,10 +205,13 @@ private boolean verifyFunctionKind(
 			return false;
 		}
 
-		// it would be nice to give a more meaningful exception when a scalar function is used instead
-		// of a table function and vice versa, but we can do that only once FLIP-51 is implemented
+		final FunctionKind kind = definition.getKind();
 
-		if (definition.getKind() == FunctionKind.SCALAR) {
+		if (kind == FunctionKind.TABLE) {

Review comment:
       Not to ask timo, what exactly is this verifying?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
##########
@@ -164,6 +168,33 @@ public static TableSchema expandCompositeTypeToSchema(DataType dataType) {
 		return tableSchema.getFieldDataType(name);
 	}
 
+	/**

Review comment:
       This only flattens the first level, right?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
##########
@@ -19,39 +19,65 @@
 package org.apache.flink.table.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
 import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.types.extraction.TypeInferenceExtractor;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.util.Collector;
 
 /**
- * Base class for user-defined table aggregates.
- *
- * <p>The behavior of a {@link TableAggregateFunction} can be defined by implementing a series of
- * custom methods. A {@link TableAggregateFunction} needs at least three methods:
+ * Base class for a user-defined table aggregate function. A user-defined table aggregate function maps scalar

Review comment:
       Again, I really like the thorough new Javadoc!

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java
##########
@@ -77,44 +84,72 @@ public SqlAggFunction visit(CallExpression call) {
 			defaultMethod(call);
 		}
 
-		FunctionDefinition def = call.getFunctionDefinition();
-		if (AGG_DEF_SQL_OPERATOR_MAPPING.containsKey(def)) {
-			return AGG_DEF_SQL_OPERATOR_MAPPING.get(def);
+		final FunctionDefinition definition = call.getFunctionDefinition();
+		if (AGG_DEF_SQL_OPERATOR_MAPPING.containsKey(definition)) {
+			return AGG_DEF_SQL_OPERATOR_MAPPING.get(definition);
 		}
-		if (BuiltInFunctionDefinitions.DISTINCT == def) {
+		if (BuiltInFunctionDefinitions.DISTINCT == definition) {
 			Expression innerAgg = call.getChildren().get(0);
 			return innerAgg.accept(this);
 		}
 
-		if (isFunctionOfKind(call, AGGREGATE)) {
-			AggregateFunctionDefinition aggDef = (AggregateFunctionDefinition) def;
-			AggregateFunction aggFunc = aggDef.getAggregateFunction();
-			FunctionIdentifier identifier = call.getFunctionIdentifier()
-				.orElse(FunctionIdentifier.of(aggFunc.functionIdentifier()));
+		return createSqlAggFunction(
+			call.getFunctionIdentifier().orElse(null),
+			call.getFunctionDefinition());
+	}
+
+	private SqlAggFunction createSqlAggFunction(@Nullable FunctionIdentifier identifier, FunctionDefinition definition) {
+		// legacy
+		if (definition instanceof AggregateFunctionDefinition) {
+			final AggregateFunctionDefinition aggDef = (AggregateFunctionDefinition) definition;

Review comment:
       That's the clean code nerd in me but I think all the if-else branches could become function calls, including the new code outside the if. I.e:
   ```
   if (definition instanceof AggregateFunctionDefinition) {
       return translateLegacyAggregateFunction(...)
   } else if (...) {
       return translateLegacyTableAggregateFunction(...)
   }
   
   return translateFunction(...)
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
##########
@@ -445,6 +445,15 @@ public Integer visit(StructuredType structuredType) {
 		public Integer visit(DistinctType distinctType) {
 			return distinctType.getSourceType().accept(this);
 		}
+
+		@Override

Review comment:
       Stumpled across this? What does `defaultMethod` actually do?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org