You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/02 13:17:27 UTC

[GitHub] [flink] wuchong commented on a diff in pull request #18958: [FLINK-15854][hive] Use the new type inference for Hive UDTF

wuchong commented on code in PR #18958:
URL: https://github.com/apache/flink/pull/18958#discussion_r887910344


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionArguments.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+/** Stores arguments information for a Hive function . */
+public class HiveFunctionArguments implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    // input arguments -- store the value if an argument is literal, null otherwise
+    private final Object[] args;
+    // date type of each argument
+    private final DataType[] argTypes;
+    // store the indices of literal arguments, so that we can support null literals
+    private final BitSet literalIndices;
+
+    private HiveFunctionArguments(Object[] args, DataType[] argTypes, BitSet literalIndices) {
+        this.args = args;
+        this.argTypes = argTypes;
+        this.literalIndices = literalIndices;
+    }
+
+    public int size() {
+        return args.length;
+    }
+
+    public boolean isLiteral(int pos) {
+        return pos >= 0 && pos < args.length && literalIndices.get(pos);
+    }
+
+    public Object getArg(int pos) {
+        return args[pos];
+    }
+
+    public DataType getDataType(int pos) {
+        return argTypes[pos];
+    }
+
+    // create from a CallContext
+    public static HiveFunctionArguments create(CallContext callContext) {
+        DataType[] argTypes = callContext.getArgumentDataTypes().toArray(new DataType[0]);
+        Object[] args = new Object[argTypes.length];
+        BitSet literalIndices = new BitSet(args.length);
+        for (int i = 0; i < args.length; i++) {
+            if (callContext.isArgumentLiteral(i)) {
+                literalIndices.set(i);
+                args[i] =
+                        callContext
+                                .getArgumentValue(
+                                        i,
+                                        ClassLogicalTypeConverter.getDefaultExternalClassForType(

Review Comment:
   This method is deprecated and is in `flink-table-runtime`.
   I think you can simply call `argTypes[i].getLogicalType().getDefaultConversion()`? 



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java:
##########
@@ -19,38 +19,104 @@
 package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategy;
 
-/**
- * Interface for Hive simple udf, generic udf, and generic udtf. TODO: Note: this is only a
- * temporary interface for workaround when Flink type system and udf system rework is not finished.
- * Should adapt to Flink type system and Flink UDF framework later on.
- */
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Interface for Hive UDF, UDTF, UDAF. */
 @Internal
 public interface HiveFunction {
 
-    /**
-     * Set arguments and argTypes for Function instance. In this way, the correct method can be
-     * really deduced by the function instance.
-     *
-     * @param constantArguments arguments of a function call (only literal arguments are passed,
-     *     nulls for non-literal ones)
-     * @param argTypes types of arguments
-     */
-    void setArgumentTypesAndConstants(Object[] constantArguments, DataType[] argTypes);
+    /** Sets input arguments for the function. */
+    void setArguments(CallContext callContext);
 
     /**
-     * Get result type by arguments and argTypes.
-     *
-     * <p>We can't use getResultType(Object[], Class[]). The Class[] is the classes of what is
-     * defined in eval(), for example, if eval() is "public Integer eval(Double)", the argTypes
-     * would be Class[Double]. However, in our wrapper, the signature of eval() is "public Object
-     * eval(Object... args)", which means we cannot get any info from the interface.
+     * Infers the return type of the function. This method should be called after {@link
+     * #setArguments(CallContext)} is called.
      *
-     * @param constantArguments arguments of a function call (only literal arguments are passed,
-     *     nulls for non-literal ones)
-     * @param argTypes types of arguments
-     * @return result type.
+     * @return The inferred return type.
+     * @throws UDFArgumentException can be thrown if the input arguments are invalid.
      */
-    DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes);
+    DataType inferReturnType() throws UDFArgumentException;
+
+    /** Gets the wrapper for the Hive function. */
+    HiveFunctionWrapper getFunctionWrapper();

Review Comment:
   Declare the generic type `<UDFType>` in interface?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java:
##########
@@ -2539,9 +2539,9 @@ private RelNode genUDTFPlan(
 
         SqlOperator convertedOperator = convertedCall.getOperator();
         Preconditions.checkState(
-                convertedOperator instanceof SqlUserDefinedTableFunction,
+                convertedOperator instanceof BridgingSqlFunction,
                 "Expect operator to be "
-                        + SqlUserDefinedTableFunction.class.getSimpleName()
+                        + BridgingSqlFunction.class.getSimpleName()

Review Comment:
   What's the reason for changing this?



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:
##########
@@ -288,6 +290,15 @@ public void testUDTF() throws Exception {
                                     .collect());
             assertThat(results.toString()).isEqualTo("[+I[{1=a, 2=b}], +I[{3=c}]]");
 
+            assertThat(results.toString()).isEqualTo("[+I[{1=a, 2=b}], +I[{3=c}]]");

Review Comment:
   This assertion is redundant?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org