You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "stream2000 (via GitHub)" <gi...@apache.org> on 2023/04/02 15:55:25 UTC

[GitHub] [hudi] stream2000 commented on a diff in pull request #8076: [HUDI-5884] Support bulk_insert for insert_overwrite and insert_overwrite_table

stream2000 commented on code in PR #8076:
URL: https://github.com/apache/hudi/pull/8076#discussion_r1155332765


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -770,66 +770,70 @@ object HoodieSparkSqlWriter {
     }
   }
 
-  def bulkInsertAsRow(sqlContext: SQLContext,
+  def bulkInsertAsRow(writeClient: SparkRDDWriteClient[_],
+                      parameters: Map[String, String],
                       hoodieConfig: HoodieConfig,
                       df: DataFrame,
+                      mode: SaveMode,
                       tblName: String,
                       basePath: Path,
-                      path: String,
                       instantTime: String,
                       writerSchema: Schema,
-                      isTablePartitioned: Boolean): (Boolean, common.util.Option[String]) = {
+                      tableConfig: HoodieTableConfig):
+  (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = {
     if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
       throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")
     }
+    val sqlContext = writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext
+    val jsc = writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getJavaSparkContext
 
     val writerSchemaStr = writerSchema.toString
 
-    val opts = hoodieConfig.getProps.toMap ++
+    // Make opts mutable since it could be modified by tryOverrideParquetWriteLegacyFormatProperty
+    val opts = mutable.Map() ++ hoodieConfig.getProps.toMap ++
       Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr)
 
-    val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr, path, tblName, mapAsJavaMap(opts))
-    val populateMetaFields = hoodieConfig.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS)
-
-    val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
-      val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
-      if (userDefinedBulkInsertPartitionerOpt.isPresent) {
-        userDefinedBulkInsertPartitionerOpt.get
-      } else {
-        BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig, isTablePartitioned)
-      }
-    } else {
-      // Sort modes are not yet supported when meta fields are disabled
-      new NonSortPartitionerWithRows()
+    // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
+    tryOverrideParquetWriteLegacyFormatProperty(opts, convertAvroSchemaToStructType(writerSchema))
+    val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr, basePath.toString, tblName, opts)
+    val executor = mode match {
+      case SaveMode.Append =>
+        new DatasetBulkInsertActionExecutor(writeConfig, writeClient, instantTime)

Review Comment:
   Could we use writeClient to do the insert overwrite indead of calling the xxxActionExecutor directly? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableActionExecutor.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commit;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class DatasetBulkInsertOverwriteTableActionExecutor extends DatasetBulkInsertOverwriteActionExecutor {

Review Comment:
   Ditto, DatasetBulkInsertOverwriteTableCommitActionExecutor



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteActionExecutor.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commit;
+
+import org.apache.hudi.HoodieDatasetBulkInsertHelper;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class DatasetBulkInsertOverwriteActionExecutor extends BaseDatasetBulkCommitActionExecutor {

Review Comment:
   Ditto, DatasetBulkInsertOverwriteCommitActionExecutor



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertActionExecutor.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commit;
+
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieInternalConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.DataSourceInternalWriterHelper;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class DatasetBulkInsertActionExecutor extends BaseDatasetBulkCommitActionExecutor {

Review Comment:
   Maybe we should change DatasetBulkInsertActionExecutor -> DatasetBulkInsertCommitActionExecutor  since it is sub class of BaseCommitActionExecutor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org