You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "geonyeongkim (via GitHub)" <gi...@apache.org> on 2023/03/13 10:05:55 UTC

[GitHub] [hudi] geonyeongkim opened a new issue, #8164: [SUPPORT] Flink KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer

geonyeongkim opened a new issue, #8164:
URL: https://github.com/apache/hudi/issues/8164

   **Describe the problem you faced**
   
   Hello.
   I'm going to get the log data in json format from kafka and create an app that loads it into the hudi table using the hudi stream api.
   
   Operation has been set to BULK_INSERT to load log data.
   
   However, if you set it to BULK_INSERT, the casting problem will occur as follows.
   
   `KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer`
   
   ---
   
   This occurs during the opening of the Sort Operator class.
   - [SortOperator](https://github.com/apache/hudi/blob/25c2f5949384def4b22d1a77f3fe76cc0d493f06/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java#L72)
   
   Flink uses Kryo as the default Serializer.
   
   How can I use Sort Operator to perform BULK_INSERT?
   
   
   **Environment Description**
   
   * Hudi version : 0.12.2
   
   * Flink version : 1.15.1
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] geonyeongkim commented on issue #8164: [SUPPORT] Flink KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer

Posted by "geonyeongkim (via GitHub)" <gi...@apache.org>.
geonyeongkim commented on issue #8164:
URL: https://github.com/apache/hudi/issues/8164#issuecomment-1469934370

   Hello.
   I looked at HoodieFlinkStreamer in github and used JsonRowDataDeserializationSchema to troubleshoot SortOperator.
   
   I have a few questions about it.
   
   ## 1. BULK_INSERT 
   If the operation is set to BULK_INSERT, there will be no error.
   
   However, it only consumes kafka messages and does not actually create parquet files in hdfs.
   
   My code simply writes kafka messages to the hudi table on the hdfs.
   
   ```kotlin
   @JvmStatic
   fun main(args: Array<String>) {
       val env = StreamExecutionEnvironment.getExecutionEnvironment()
       env.enableCheckpointing(5000)
   
       val props = Configuration()
       props.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, "avro schema")
   
       val rowType = AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(props)).logicalType as RowType
       
       val kafkaSource = KafkaSource.builder<RowData>()
           .setBootstrapServers(bootstrapServers)
           .setTopics(topic)
           .setGroupId(SampleHudiApp::class.java.name)
           .setClientIdPrefix(UUID.randomUUID().toString())
           .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
           .setDeserializer(CustomJsonRowDataDeserializationSchema(
               rowType,
               InternalTypeInfo.of(rowType),
               false,
               true,
               TimestampFormat.ISO_8601
           ))
           .build()
       HoodiePipeline.builder("hudi_test_table")
           .column("id BIGINT")
           .column("name STRING")
           .column("`partition_path` STRING")
           .column("ts BIGINT")
           .column("dc STRING")
           .column("op STRING")
           .pk("id")
           .partition("partition_path")
           .options(mapOf(
               FlinkOptions.PATH.key() to "hdfs:///user/geonyeong.kim/hudi_flink_test",
               FlinkOptions.TABLE_TYPE.key() to HoodieTableType.COPY_ON_WRITE.name,
               FlinkOptions.INDEX_GLOBAL_ENABLED.key() to "false"
           ))
           .sink(env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "hudi_source"), true)
       env.execute("HUDI STREAM SINK")
   }
   ```
   
   **If the WRITE_BULK_INSERT_SORT_INPUT setting is false, it works normally.**
   
   **Is this a bug? Or can WRITE_BULK_INSERT_SORT_INPUT work even if this setting is true?**
   
   ## 2. BULK_INSERT vs APPEND
   
   I viewed org.apache.hudi.sink.utils.Pipelines class.
   And i confirmed that BulkInsertWriteFunction class is used for bulk_insert mode and AppendWriteFunction class is used for append mode.
   
   However, if the index type is not Bucket in BulkInsertWriterFunction, BulkInsertWriterHelper is used.
   
   AppendWriteFunction also uses BulkInsertWriterHelper.
   **Then, if the index type is a FLINK_STATE, will the behavior of the two be the same?**
   
   - [WriterHelpers](https://github.com/apache/hudi/blob/d760ed99734664d0e428ed274ff3e3397724cae9/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/WriterHelpers.java#L35)
   
   - [AppendWriteFunction](https://github.com/apache/hudi/blob/d760ed99734664d0e428ed274ff3e3397724cae9/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java#L115)
   
   ## 3. Compress
   
   I want to use Flink to apply Compress to the COW table when Parquet Write.
   
   In Flink, Hudi Write created HoodieFlinkWriteClient in FlinkWriteClients based on the FlinkOptions value and confirmed that each WriteFunction uses it.
   
   So I overrided the [FlinkWriteClients](https://github.com/apache/hudi/blob/d760ed99734664d0e428ed274ff3e3397724cae9/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java#L208)
    class and added the parquetCompressionCodec("gzip") setting.
   
   <img width="898" alt="image" src="https://user-images.githubusercontent.com/31622350/225310483-488540f4-9055-4d7d-838d-6ba6d52e74c1.png">
   
   **However, compress was not applied.**
   **Is this not applicable in Flink?**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on issue #8164: [SUPPORT] Flink KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on issue #8164:
URL: https://github.com/apache/hudi/issues/8164#issuecomment-1467229462

   The input element of the `SortOperator` should be a `RowData`, because the serializer is hard coded into `BinaryRowDataSerializer`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on issue #8164: [SUPPORT] Flink KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on issue #8164:
URL: https://github.com/apache/hudi/issues/8164#issuecomment-1473313206

   
   1. Did you enable the ckp yet? Flink sink relies the ckp success event for Hudi trasanction commiting;
   2. Both bulk_insert and append_write use the `BulkInsertWriterHelper` to write the parquet files direcly, there is no UPSERTs, if `FLINK_STATE` is used, things are very diffrent, the `StreamWriteFunction` would kick in;
   3. You can just set up the compress options within the Flink SQL options, or the `HoodiePipeline#options` you have used:
   
   e.g.
   
   ```sql
   create table xxx(
   ) with (
     'connector' = 'hudi',
     'hoodie.parquet.compression.codec' = 'gzip'
   );
   ```
   
   ```java
   HoodiePipeline.builder("xxx")
       .option("hoodie.parquet.compression.codec", "gzip")
   ```
   
   The default codec is already gzip, probably that is the reason you do not perceive any difference


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] geonyeongkim commented on issue #8164: [SUPPORT] Flink KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer

Posted by "geonyeongkim (via GitHub)" <gi...@apache.org>.
geonyeongkim commented on issue #8164:
URL: https://github.com/apache/hudi/issues/8164#issuecomment-1471536137

   @danny0405 
   Hello.
   Could you answer the question above?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] geonyeongkim commented on issue #8164: [SUPPORT] Flink KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer

Posted by "geonyeongkim (via GitHub)" <gi...@apache.org>.
geonyeongkim commented on issue #8164:
URL: https://github.com/apache/hudi/issues/8164#issuecomment-1476002948

   ### 1. Did you enable the ckp yet? Flink sink relies the ckp success event for Hudi trasanction commiting;
   
   Ckp means checkpoint, right?
   
   As shown in the attached picture, checkpoint is performed normally.
   
   But still no file in hdfs while consuming kafka message.
   
   Moreover, the problem is that we are committing to the kafka broker.
   
   **checkpoint**
   ![image](https://user-images.githubusercontent.com/31622350/226316175-b20f6f51-bc5f-490f-bc1b-58b03df20ec8.png)
   
   **hdfs directory**
   ![image](https://user-images.githubusercontent.com/31622350/226316796-b89fb2ec-a754-47ed-8cee-0558c809ef45.png)
   
   ### 2. Both bulk_insert and append_write use the BulkInsertWriterHelper to write the parquet files direcly, there is no UPSERTs, if FLINK_STATE is used, things are very diffrent, the StreamWriteFunction would kick in;
   
   Then, in case of FLINK_STATE, can you tell me the difference between bulk_insert and append in detail?
   
   ### 3. You can just set up the compress options within the Flink SQL options, or the HoodiePipeline
   
   I tried to restart by adding the settings below as a guide.
   
   ```java
   HoodiePipeline.builder("xxx")
       .option("hoodie.parquet.compression.codec", "gzip")
   ```
   
   However, gzip compression still does not apply.
   
   ---
   
   I know that compression is difficult to apply in the stream associated with Hadoop.
   
   **But it's very strange that bulk_insert doesn't work.**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on issue #8164: [SUPPORT] Flink KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on issue #8164:
URL: https://github.com/apache/hudi/issues/8164#issuecomment-1477370558

   > Then, in case of FLINK_STATE, can you tell me the difference between bulk_insert and append in detail?
   
   Flink state index only works for UPSERT operation, not BULK_INSERT.
   
   > But it's very strange that bulk_insert doesn't work.
   
   Bulk insert only works in batch execution mode.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org