You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/08 10:23:41 UTC

[GitHub] [flink] luoyuxia commented on a diff in pull request #19520: [FLINK-26366][hive] support insert overwrite directory

luoyuxia commented on code in PR #19520:
URL: https://github.com/apache/flink/pull/19520#discussion_r892187726


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java:
##########
@@ -285,6 +305,81 @@ public Operation createInsertOperation(HiveParserCalcitePlanner analyzer, RelNod
                 Collections.emptyMap());
     }
 
+    private SinkModifyOperation createInsertIntoDirectoryOperation(
+            HiveParserQB topQB, QBMetaData qbMetaData, RelNode queryRelNode, HiveConf hiveConf) {
+        String dest = topQB.getParseInfo().getClauseNamesForDest().iterator().next();
+        // get the location for insert into directory
+        String location = qbMetaData.getDestFileForAlias(dest);
+        // get whether it's for insert local directory
+        boolean isToLocal = qbMetaData.getDestTypeForAlias(dest) == QBMetaData.DEST_LOCAL_FILE;
+        HiveParserDirectoryDesc directoryDesc = topQB.getDirectoryDesc();
+
+        // set row format / stored as / location
+        Map<String, String> props = new HashMap<>();
+        HiveParserDDLSemanticAnalyzer.encodeRowFormat(directoryDesc.getRowFormatParams(), props);
+        HiveParserDDLSemanticAnalyzer.encodeStorageFormat(directoryDesc.getStorageFormat(), props);
+        props.put(TABLE_LOCATION_URI, location);
+
+        props.put(FactoryUtil.CONNECTOR.key(), HiveCatalogFactoryOptions.IDENTIFIER);
+        // mark it's for insert into directory
+        props.put(CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX + IS_INSERT_DIRECTORY, "true");
+        // mark it's for insert into local directory or not
+        props.put(
+                CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX + IS_TO_LOCAL_DIRECTORY,
+                String.valueOf(isToLocal));
+
+        List<RelDataTypeField> fieldList = queryRelNode.getRowType().getFieldList();
+        String[] colNameArr = new String[fieldList.size()];
+        String[] colTypeArr = new String[fieldList.size()];
+        for (int i = 0; i < fieldList.size(); i++) {
+            colNameArr[i] = fieldList.get(i).getName();
+            TypeInfo typeInfo = HiveParserTypeConverter.convert(fieldList.get(i).getType());
+            if (typeInfo.equals(TypeInfoFactory.voidTypeInfo)) {
+                colTypeArr[i] = TypeInfoFactory.stringTypeInfo.getTypeName();
+            } else {
+                colTypeArr[i] = typeInfo.getTypeName();
+            }
+        }
+
+        String colNames = String.join(",", colNameArr);
+        String colTypes = String.join(":", colTypeArr);
+        props.put("columns", colNames);
+        props.put("columns.types", colTypes);
+
+        PlannerQueryOperation plannerQueryOperation = new PlannerQueryOperation(queryRelNode);
+        return new SinkModifyOperation(
+                createDummyTableForInsertDirectory(
+                        plannerQueryOperation.getResolvedSchema(), props),
+                new PlannerQueryOperation(queryRelNode),
+                Collections.emptyMap(),
+                true, // insert into directory is always for overwrite
+                Collections.emptyMap());
+    }
+
+    private ContextResolvedTable createDummyTableForInsertDirectory(
+            ResolvedSchema resolvedSchema, Map<String, String> props) {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        for (Column column : resolvedSchema.getColumns()) {
+            schemaBuilder.column(column.getName(), column.getDataType());
+        }
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        schemaBuilder.build(),
+                        "a dummy table for the case of insert into directory ",
+                        Collections.emptyList(),
+                        props);
+        ResolvedCatalogTable resolvedCatalogTable =
+                new ResolvedCatalogTable(catalogTable, resolvedSchema);
+        String currentCatalog = catalogManager.getCurrentCatalog();
+        // the object name means nothing, it's just for placeholder and won't be used actually
+        String objectName = "insert_directory_tbl";

Review Comment:
   There's no conflict here for we will never create the table. It's just a placeholder



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org