You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/06 16:30:31 UTC

[GitHub] [flink] lsyldliu commented on a diff in pull request #19561: [FLINK-26414][hive] Hive dialect supports macro

lsyldliu commented on code in PR #19561:
URL: https://github.com/apache/flink/pull/19561#discussion_r890126279


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/planner/delegation/hive/HiveASTParserTest.java:
##########
@@ -139,6 +139,13 @@ public void testFunction() throws Exception {
         assertDDLType(HiveASTParser.TOK_SHOWFUNCTIONS, "show functions");
     }
 
+    @Test
+    public void testMacro() throws Exception {

Review Comment:
   It will be better if we add a test that create a mcro with qualified name



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##########
@@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode ast) {
         }
     }
 
+    private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException {
+        String macroName = ast.getChild(0).getText();
+        if (FunctionUtils.isQualifiedFunctionName(macroName)) {
+            throw new SemanticException("Temporary macro cannot be created with a qualified name.");

Review Comment:
   According to the method `FunctionUtils.isQualifiedFunctionName`, this error message may be not cleared for user? so I think we can improve it.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java:
##########
@@ -36,23 +38,48 @@
     public static final long serialVersionUID = 393313529306818205L;
 
     private final String className;
+    // a field to hold the string serialized for the UDF.
+    // we sometimes need to hold it in case of some serializable UDF will contain
+    // additional information such as Hive's GenericUDFMacro and if we construct the UDF directly by
+    // getUDFClass#newInstance, the information will be missed.
+    private String udfSerializedString;
 
     private transient UDFType instance = null;
 
     public HiveFunctionWrapper(String className) {
         this.className = className;
     }
 
+    /**
+     * Create a HiveFunctionWrapper with a UDF instance. In this constructor, the instance will be
+     * serialized to string and held on in the HiveFunctionWrapper.
+     */
+    public HiveFunctionWrapper(String className, UDFType serializableInstance) {
+        this.className = className;

Review Comment:
   we can call it directly `this(className);` 



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##########
@@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode ast) {
         }
     }
 
+    private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException {
+        String macroName = ast.getChild(0).getText();
+        if (FunctionUtils.isQualifiedFunctionName(macroName)) {
+            throw new SemanticException("Temporary macro cannot be created with a qualified name.");
+        }
+
+        List<FieldSchema> arguments = getColumns((HiveParserASTNode) ast.getChild(1), true);

Review Comment:
   The `arguments ` readability is not good? IMO, it refers the all columns of the table? In other words, we should add an annotation about it.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java:
##########
@@ -88,4 +115,20 @@ public String getClassName() {
     public Class<UDFType> getUDFClass() throws ClassNotFoundException {
         return (Class<UDFType>) Thread.currentThread().getContextClassLoader().loadClass(className);
     }
+
+    /**
+     * Deserialize UDF used the udfSerializedString held on.
+     *
+     * @return the UDF deserialized
+     */
+    private UDFType deserializeUDF() {
+        try {
+            return (UDFType)
+                    SerializationUtilities.deserializeObject(
+                            udfSerializedString, (Class<Serializable>) getUDFClass());
+        } catch (ClassNotFoundException e) {
+            throw new FlinkHiveUDFException(
+                    String.format("Failed to deserialize function %s", className), e);
+        }

Review Comment:
   "Failed to deserialize function %s."



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##########
@@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode ast) {
         }
     }
 
+    private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException {
+        String macroName = ast.getChild(0).getText();
+        if (FunctionUtils.isQualifiedFunctionName(macroName)) {
+            throw new SemanticException("Temporary macro cannot be created with a qualified name.");
+        }
+
+        List<FieldSchema> arguments = getColumns((HiveParserASTNode) ast.getChild(1), true);
+        Set<String> actualColumnNames = getActualColumnNames(ast, arguments);
+
+        HiveParserRowResolver rowResolver = new HiveParserRowResolver();
+        List<String> macroColumnNames = new ArrayList<>();
+        List<TypeInfo> macroColumnTypes = new ArrayList<>();
+        getMacroColumnData(
+                arguments, actualColumnNames, rowResolver, macroColumnNames, macroColumnTypes);
+        ExprNodeDesc body = getBody(ast, arguments, rowResolver);
+
+        GenericUDFMacro macro =
+                new GenericUDFMacro(macroName, body, macroColumnNames, macroColumnTypes);
+
+        FunctionDefinition macroDefinition =
+                new HiveGenericUDF(
+                        new HiveFunctionWrapper<>(GenericUDFMacro.class.getName(), macro),
+                        hiveShim);
+        // hive's marco is more like flink's temp system function
+        return new CreateTempSystemFunctionOperation(macroName, false, macroDefinition);
+    }
+
+    private Set<String> getActualColumnNames(HiveParserASTNode ast, List<FieldSchema> arguments)
+            throws SemanticException {
+        final Set<String> actualColumnNames = new HashSet<>();
+
+        if (!arguments.isEmpty()) {
+            // Walk down expression to see which arguments are actually used.
+            Node expression = (Node) ast.getChild(2);
+
+            PreOrderWalker walker =
+                    new PreOrderWalker(
+                            (nd, stack, nodeOutputs) -> {
+                                if (nd instanceof HiveParserASTNode) {
+                                    HiveParserASTNode node = (HiveParserASTNode) nd;
+                                    if (node.getType() == HiveASTParser.TOK_TABLE_OR_COL) {
+                                        actualColumnNames.add(node.getChild(0).getText());
+                                    }
+                                }
+                                return null;
+                            });
+            walker.startWalking(Collections.singleton(expression), null);
+        }
+        return actualColumnNames;
+    }
+
+    private void getMacroColumnData(
+            List<FieldSchema> arguments,
+            Set<String> actualColumnNames,
+            HiveParserRowResolver rowResolver,
+            List<String> macroColumnNames,
+            List<TypeInfo> macroColumnTypes)
+            throws SemanticException {
+        for (FieldSchema argument : arguments) {
+            TypeInfo columnType = TypeInfoUtils.getTypeInfoFromTypeString(argument.getType());
+            rowResolver.put(
+                    "",
+                    argument.getName(),
+                    new ColumnInfo(argument.getName(), columnType, "", false));
+            macroColumnNames.add(argument.getName());
+            macroColumnTypes.add(columnType);
+        }
+        Set<String> expectedColumnNames = new LinkedHashSet<>(macroColumnNames);
+        if (!expectedColumnNames.equals(actualColumnNames)) {
+            throw new SemanticException(
+                    "Expected columns " + expectedColumnNames + " but found " + actualColumnNames);
+        }
+        if (expectedColumnNames.size() != macroColumnNames.size()) {

Review Comment:
   What about `expectedColumnNames.size() > macroColumnNames.size()` or `expectedColumnNames.size() < macroColumnNames.size()` case, whether the error message is the same?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##########
@@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode ast) {
         }
     }
 
+    private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException {
+        String macroName = ast.getChild(0).getText();
+        if (FunctionUtils.isQualifiedFunctionName(macroName)) {
+            throw new SemanticException("Temporary macro cannot be created with a qualified name.");
+        }
+
+        List<FieldSchema> arguments = getColumns((HiveParserASTNode) ast.getChild(1), true);
+        Set<String> actualColumnNames = getActualColumnNames(ast, arguments);
+
+        HiveParserRowResolver rowResolver = new HiveParserRowResolver();
+        List<String> macroColumnNames = new ArrayList<>();
+        List<TypeInfo> macroColumnTypes = new ArrayList<>();
+        getMacroColumnData(
+                arguments, actualColumnNames, rowResolver, macroColumnNames, macroColumnTypes);
+        ExprNodeDesc body = getBody(ast, arguments, rowResolver);
+
+        GenericUDFMacro macro =
+                new GenericUDFMacro(macroName, body, macroColumnNames, macroColumnTypes);
+
+        FunctionDefinition macroDefinition =
+                new HiveGenericUDF(
+                        new HiveFunctionWrapper<>(GenericUDFMacro.class.getName(), macro),
+                        hiveShim);
+        // hive's marco is more like flink's temp system function
+        return new CreateTempSystemFunctionOperation(macroName, false, macroDefinition);
+    }
+
+    private Set<String> getActualColumnNames(HiveParserASTNode ast, List<FieldSchema> arguments)
+            throws SemanticException {
+        final Set<String> actualColumnNames = new HashSet<>();
+
+        if (!arguments.isEmpty()) {
+            // Walk down expression to see which arguments are actually used.
+            Node expression = (Node) ast.getChild(2);
+
+            PreOrderWalker walker =
+                    new PreOrderWalker(
+                            (nd, stack, nodeOutputs) -> {
+                                if (nd instanceof HiveParserASTNode) {
+                                    HiveParserASTNode node = (HiveParserASTNode) nd;
+                                    if (node.getType() == HiveASTParser.TOK_TABLE_OR_COL) {
+                                        actualColumnNames.add(node.getChild(0).getText());
+                                    }
+                                }
+                                return null;
+                            });
+            walker.startWalking(Collections.singleton(expression), null);
+        }
+        return actualColumnNames;
+    }
+
+    private void getMacroColumnData(
+            List<FieldSchema> arguments,
+            Set<String> actualColumnNames,
+            HiveParserRowResolver rowResolver,
+            List<String> macroColumnNames,
+            List<TypeInfo> macroColumnTypes)
+            throws SemanticException {
+        for (FieldSchema argument : arguments) {
+            TypeInfo columnType = TypeInfoUtils.getTypeInfoFromTypeString(argument.getType());
+            rowResolver.put(
+                    "",
+                    argument.getName(),
+                    new ColumnInfo(argument.getName(), columnType, "", false));

Review Comment:
   As above



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##########
@@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode ast) {
         }
     }
 
+    private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException {
+        String macroName = ast.getChild(0).getText();
+        if (FunctionUtils.isQualifiedFunctionName(macroName)) {
+            throw new SemanticException("Temporary macro cannot be created with a qualified name.");
+        }
+
+        List<FieldSchema> arguments = getColumns((HiveParserASTNode) ast.getChild(1), true);
+        Set<String> actualColumnNames = getActualColumnNames(ast, arguments);
+
+        HiveParserRowResolver rowResolver = new HiveParserRowResolver();
+        List<String> macroColumnNames = new ArrayList<>();
+        List<TypeInfo> macroColumnTypes = new ArrayList<>();
+        getMacroColumnData(
+                arguments, actualColumnNames, rowResolver, macroColumnNames, macroColumnTypes);
+        ExprNodeDesc body = getBody(ast, arguments, rowResolver);
+
+        GenericUDFMacro macro =
+                new GenericUDFMacro(macroName, body, macroColumnNames, macroColumnTypes);
+
+        FunctionDefinition macroDefinition =
+                new HiveGenericUDF(
+                        new HiveFunctionWrapper<>(GenericUDFMacro.class.getName(), macro),
+                        hiveShim);
+        // hive's marco is more like flink's temp system function
+        return new CreateTempSystemFunctionOperation(macroName, false, macroDefinition);
+    }
+
+    private Set<String> getActualColumnNames(HiveParserASTNode ast, List<FieldSchema> arguments)
+            throws SemanticException {
+        final Set<String> actualColumnNames = new HashSet<>();
+
+        if (!arguments.isEmpty()) {
+            // Walk down expression to see which arguments are actually used.
+            Node expression = (Node) ast.getChild(2);
+
+            PreOrderWalker walker =
+                    new PreOrderWalker(
+                            (nd, stack, nodeOutputs) -> {
+                                if (nd instanceof HiveParserASTNode) {
+                                    HiveParserASTNode node = (HiveParserASTNode) nd;
+                                    if (node.getType() == HiveASTParser.TOK_TABLE_OR_COL) {
+                                        actualColumnNames.add(node.getChild(0).getText());
+                                    }
+                                }
+                                return null;
+                            });
+            walker.startWalking(Collections.singleton(expression), null);
+        }
+        return actualColumnNames;
+    }
+
+    private void getMacroColumnData(
+            List<FieldSchema> arguments,
+            Set<String> actualColumnNames,
+            HiveParserRowResolver rowResolver,
+            List<String> macroColumnNames,
+            List<TypeInfo> macroColumnTypes)
+            throws SemanticException {
+        for (FieldSchema argument : arguments) {
+            TypeInfo columnType = TypeInfoUtils.getTypeInfoFromTypeString(argument.getType());
+            rowResolver.put(
+                    "",

Review Comment:
   org.apache.commons.lang3.StringUtils.EMPTY?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##########
@@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode ast) {
         }
     }
 
+    private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException {
+        String macroName = ast.getChild(0).getText();
+        if (FunctionUtils.isQualifiedFunctionName(macroName)) {
+            throw new SemanticException("Temporary macro cannot be created with a qualified name.");
+        }
+
+        List<FieldSchema> arguments = getColumns((HiveParserASTNode) ast.getChild(1), true);
+        Set<String> actualColumnNames = getActualColumnNames(ast, arguments);
+
+        HiveParserRowResolver rowResolver = new HiveParserRowResolver();
+        List<String> macroColumnNames = new ArrayList<>();
+        List<TypeInfo> macroColumnTypes = new ArrayList<>();
+        getMacroColumnData(
+                arguments, actualColumnNames, rowResolver, macroColumnNames, macroColumnTypes);
+        ExprNodeDesc body = getBody(ast, arguments, rowResolver);
+
+        GenericUDFMacro macro =
+                new GenericUDFMacro(macroName, body, macroColumnNames, macroColumnTypes);
+
+        FunctionDefinition macroDefinition =
+                new HiveGenericUDF(
+                        new HiveFunctionWrapper<>(GenericUDFMacro.class.getName(), macro),
+                        hiveShim);
+        // hive's marco is more like flink's temp system function
+        return new CreateTempSystemFunctionOperation(macroName, false, macroDefinition);
+    }
+
+    private Set<String> getActualColumnNames(HiveParserASTNode ast, List<FieldSchema> arguments)
+            throws SemanticException {
+        final Set<String> actualColumnNames = new HashSet<>();
+
+        if (!arguments.isEmpty()) {
+            // Walk down expression to see which arguments are actually used.
+            Node expression = (Node) ast.getChild(2);
+
+            PreOrderWalker walker =
+                    new PreOrderWalker(
+                            (nd, stack, nodeOutputs) -> {
+                                if (nd instanceof HiveParserASTNode) {
+                                    HiveParserASTNode node = (HiveParserASTNode) nd;
+                                    if (node.getType() == HiveASTParser.TOK_TABLE_OR_COL) {
+                                        actualColumnNames.add(node.getChild(0).getText());
+                                    }
+                                }
+                                return null;
+                            });
+            walker.startWalking(Collections.singleton(expression), null);
+        }
+        return actualColumnNames;
+    }
+
+    private void getMacroColumnData(

Review Comment:
   We can return a Tuple2 here directly? BTW, maybe we can merge the method `getMacroColumnData` and `getBody` into one, then return a Tuple3?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##########
@@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode ast) {
         }
     }
 
+    private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException {
+        String macroName = ast.getChild(0).getText();
+        if (FunctionUtils.isQualifiedFunctionName(macroName)) {
+            throw new SemanticException("Temporary macro cannot be created with a qualified name.");
+        }
+
+        List<FieldSchema> arguments = getColumns((HiveParserASTNode) ast.getChild(1), true);
+        Set<String> actualColumnNames = getActualColumnNames(ast, arguments);
+
+        HiveParserRowResolver rowResolver = new HiveParserRowResolver();
+        List<String> macroColumnNames = new ArrayList<>();
+        List<TypeInfo> macroColumnTypes = new ArrayList<>();
+        getMacroColumnData(
+                arguments, actualColumnNames, rowResolver, macroColumnNames, macroColumnTypes);
+        ExprNodeDesc body = getBody(ast, arguments, rowResolver);
+
+        GenericUDFMacro macro =
+                new GenericUDFMacro(macroName, body, macroColumnNames, macroColumnTypes);
+
+        FunctionDefinition macroDefinition =
+                new HiveGenericUDF(
+                        new HiveFunctionWrapper<>(GenericUDFMacro.class.getName(), macro),
+                        hiveShim);
+        // hive's marco is more like flink's temp system function
+        return new CreateTempSystemFunctionOperation(macroName, false, macroDefinition);
+    }
+
+    private Set<String> getActualColumnNames(HiveParserASTNode ast, List<FieldSchema> arguments)
+            throws SemanticException {
+        final Set<String> actualColumnNames = new HashSet<>();
+
+        if (!arguments.isEmpty()) {
+            // Walk down expression to see which arguments are actually used.
+            Node expression = (Node) ast.getChild(2);
+
+            PreOrderWalker walker =
+                    new PreOrderWalker(
+                            (nd, stack, nodeOutputs) -> {
+                                if (nd instanceof HiveParserASTNode) {
+                                    HiveParserASTNode node = (HiveParserASTNode) nd;
+                                    if (node.getType() == HiveASTParser.TOK_TABLE_OR_COL) {
+                                        actualColumnNames.add(node.getChild(0).getText());
+                                    }
+                                }
+                                return null;
+                            });
+            walker.startWalking(Collections.singleton(expression), null);
+        }
+        return actualColumnNames;
+    }
+
+    private void getMacroColumnData(
+            List<FieldSchema> arguments,
+            Set<String> actualColumnNames,
+            HiveParserRowResolver rowResolver,
+            List<String> macroColumnNames,
+            List<TypeInfo> macroColumnTypes)
+            throws SemanticException {
+        for (FieldSchema argument : arguments) {
+            TypeInfo columnType = TypeInfoUtils.getTypeInfoFromTypeString(argument.getType());
+            rowResolver.put(
+                    "",
+                    argument.getName(),
+                    new ColumnInfo(argument.getName(), columnType, "", false));
+            macroColumnNames.add(argument.getName());
+            macroColumnTypes.add(columnType);
+        }
+        Set<String> expectedColumnNames = new LinkedHashSet<>(macroColumnNames);
+        if (!expectedColumnNames.equals(actualColumnNames)) {
+            throw new SemanticException(

Review Comment:
   String.format("Expected columns [%s], but found [%s].", expectedColumnNames, actualColumnNames) here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org