You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "geonyeongkim (via GitHub)" <gi...@apache.org> on 2023/03/15 12:41:57 UTC

[GitHub] [hudi] geonyeongkim commented on issue #8164: [SUPPORT] Flink KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer

geonyeongkim commented on issue #8164:
URL: https://github.com/apache/hudi/issues/8164#issuecomment-1469934370

   Hello.
   I looked at HoodieFlinkStreamer in github and used JsonRowDataDeserializationSchema to troubleshoot SortOperator.
   
   I have a few questions about it.
   
   ## 1. BULK_INSERT 
   If the operation is set to BULK_INSERT, there will be no error.
   
   However, it only consumes kafka messages and does not actually create parquet files in hdfs.
   
   My code simply writes kafka messages to the hudi table on the hdfs.
   
   ```kotlin
   @JvmStatic
   fun main(args: Array<String>) {
       val env = StreamExecutionEnvironment.getExecutionEnvironment()
       env.enableCheckpointing(5000)
   
       val props = Configuration()
       props.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, "avro schema")
   
       val rowType = AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(props)).logicalType as RowType
       
       val kafkaSource = KafkaSource.builder<RowData>()
           .setBootstrapServers(bootstrapServers)
           .setTopics(topic)
           .setGroupId(SampleHudiApp::class.java.name)
           .setClientIdPrefix(UUID.randomUUID().toString())
           .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
           .setDeserializer(CustomJsonRowDataDeserializationSchema(
               rowType,
               InternalTypeInfo.of(rowType),
               false,
               true,
               TimestampFormat.ISO_8601
           ))
           .build()
       HoodiePipeline.builder("hudi_test_table")
           .column("id BIGINT")
           .column("name STRING")
           .column("`partition_path` STRING")
           .column("ts BIGINT")
           .column("dc STRING")
           .column("op STRING")
           .pk("id")
           .partition("partition_path")
           .options(mapOf(
               FlinkOptions.PATH.key() to "hdfs:///user/geonyeong.kim/hudi_flink_test",
               FlinkOptions.TABLE_TYPE.key() to HoodieTableType.COPY_ON_WRITE.name,
               FlinkOptions.INDEX_GLOBAL_ENABLED.key() to "false"
           ))
           .sink(env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "hudi_source"), true)
       env.execute("HUDI STREAM SINK")
   }
   ```
   
   **If the WRITE_BULK_INSERT_SORT_INPUT setting is false, it works normally.**
   
   **Is this a bug? Or can WRITE_BULK_INSERT_SORT_INPUT work even if this setting is true?**
   
   ## 2. BULK_INSERT vs APPEND
   
   I viewed org.apache.hudi.sink.utils.Pipelines class.
   And i confirmed that BulkInsertWriteFunction class is used for bulk_insert mode and AppendWriteFunction class is used for append mode.
   
   However, if the index type is not Bucket in BulkInsertWriterFunction, BulkInsertWriterHelper is used.
   
   AppendWriteFunction also uses BulkInsertWriterHelper.
   **Then, if the index type is a FLINK_STATE, will the behavior of the two be the same?**
   
   - [WriterHelpers](https://github.com/apache/hudi/blob/d760ed99734664d0e428ed274ff3e3397724cae9/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/WriterHelpers.java#L35)
   
   - [AppendWriteFunction](https://github.com/apache/hudi/blob/d760ed99734664d0e428ed274ff3e3397724cae9/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java#L115)
   
   ## 3. Compress
   
   I want to use Flink to apply Compress to the COW table when Parquet Write.
   
   In Flink, Hudi Write created HoodieFlinkWriteClient in FlinkWriteClients based on the FlinkOptions value and confirmed that each WriteFunction uses it.
   
   So I overrided the [FlinkWriteClients](https://github.com/apache/hudi/blob/d760ed99734664d0e428ed274ff3e3397724cae9/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java#L208)
    class and added the parquetCompressionCodec("gzip") setting.
   
   <img width="898" alt="image" src="https://user-images.githubusercontent.com/31622350/225310483-488540f4-9055-4d7d-838d-6ba6d52e74c1.png">
   
   **However, compress was not applied.**
   **Is this not applicable in Flink?**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org