You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/29 07:40:09 UTC

[GitHub] [flink] wuchong commented on a diff in pull request #19520: [FLINK-26366][hive] support insert overwrite directory

wuchong commented on code in PR #19520:
URL: https://github.com/apache/flink/pull/19520#discussion_r932941561


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java:
##########
@@ -209,7 +227,8 @@ public HiveParserDMLHelper(
                 identifier, new PlannerQueryOperation(queryRelNode), staticPartSpec, overwrite);
     }
 
-    public Operation createInsertOperation(HiveParserCalcitePlanner analyzer, RelNode queryRelNode)
+    public Operation createInsertOperation(
+            HiveParserCalcitePlanner analyzer, RelNode queryRelNode, HiveConf hiveConf)

Review Comment:
   The `hiveConf` is never used?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java:
##########
@@ -285,6 +304,78 @@ public Operation createInsertOperation(HiveParserCalcitePlanner analyzer, RelNod
                 Collections.emptyMap());
     }
 
+    private SinkModifyOperation createInsertIntoDirectoryOperation(
+            HiveParserQB topQB, QBMetaData qbMetaData, RelNode queryRelNode) {
+        String dest = topQB.getParseInfo().getClauseNamesForDest().iterator().next();
+        // get the location for insert into directory
+        String location = qbMetaData.getDestFileForAlias(dest);
+        // get whether it's for insert local directory
+        boolean isToLocal = qbMetaData.getDestTypeForAlias(dest) == QBMetaData.DEST_LOCAL_FILE;
+        HiveParserDirectoryDesc directoryDesc = topQB.getDirectoryDesc();
+
+        // set row format / stored as / location
+        Map<String, String> props = new HashMap<>();
+        HiveParserDDLSemanticAnalyzer.encodeRowFormat(directoryDesc.getRowFormatParams(), props);
+        HiveParserDDLSemanticAnalyzer.encodeStorageFormat(directoryDesc.getStorageFormat(), props);
+        props.put(TABLE_LOCATION_URI, location);
+
+        props.put(FactoryUtil.CONNECTOR.key(), HiveCatalogFactoryOptions.IDENTIFIER);
+        // mark it's for insert into directory
+        props.put(CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX + IS_INSERT_DIRECTORY, "true");
+        // mark it's for insert into local directory or not
+        props.put(
+                CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX + IS_TO_LOCAL_DIRECTORY,
+                String.valueOf(isToLocal));
+
+        List<RelDataTypeField> fieldList = queryRelNode.getRowType().getFieldList();
+        String[] colNameArr = new String[fieldList.size()];
+        String[] colTypeArr = new String[fieldList.size()];
+        for (int i = 0; i < fieldList.size(); i++) {
+            colNameArr[i] = fieldList.get(i).getName();
+            TypeInfo typeInfo = HiveParserTypeConverter.convert(fieldList.get(i).getType());
+            if (typeInfo.equals(TypeInfoFactory.voidTypeInfo)) {
+                colTypeArr[i] = TypeInfoFactory.stringTypeInfo.getTypeName();
+            } else {
+                colTypeArr[i] = typeInfo.getTypeName();
+            }
+        }
+
+        String colNames = String.join(",", colNameArr);
+        String colTypes = String.join(":", colTypeArr);
+        props.put("columns", colNames);
+        props.put("columns.types", colTypes);
+
+        PlannerQueryOperation plannerQueryOperation = new PlannerQueryOperation(queryRelNode);

Review Comment:
   There are some conversions (e.g. type conversion, static partitions) on the `queryRelNode` in `createInsertOperationInfo()`. Should we reuse the logic here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org