You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "longjuncai1994 (via GitHub)" <gi...@apache.org> on 2023/02/15 03:45:06 UTC

[GitHub] [hudi] longjuncai1994 opened a new issue, #7953: [SUPPORT]Code pending on writing data to S3 using Flink datastream API,and the target path is empty.

longjuncai1994 opened a new issue, #7953:
URL: https://github.com/apache/hudi/issues/7953

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?  yes
   
   **Describe the problem you faced**
   
   I want to write data to S3 on my local IDE,and set AWS CREDENTIALS in my codes (i found the configurations in source code).The configurations seems like worke well,because it did not throw exception any more. But my code is pending after log info "s3a-file-system metrics system started" 
   
   ```
   val source =
         env.generateSequence(1,10)
         .map(d=>{
           i += 1
           val rowData = new GenericRowData(4)
           rowData.setField(0,StringData.fromString(i+""))
           rowData.setField(1,StringData.fromString(d+""))
           rowData.setField(2,StringData.fromString("p1"))
           rowData.setField(3,TimestampData.fromEpochMillis(1676427048588L))
           println("dataļ¼š"+rowData.toString)
           rowData.asInstanceOf[RowData]
         }).javaStream
   
         val path = "s3a://test/hudi"
         val table = "huditable"
         val options = new util.HashMap[String,String]()
         options.put(FlinkOptions.PATH.key, path)
         options.put(FlinkOptions.TABLE_TYPE.key, HoodieTableType.MERGE_ON_READ.name)
         options.put("fs.defaultFS","s3a://test")
         options.put("hadoop.fs.s3a.access.key","mykey")
         options.put("hadoop.fs.s3a.secret.key","mykey")
         options.put("hadoop.fs.s3a.endpoint","myendpoint")
         options.put("hadoop.fs.s3a.region","myregion")
   
         val builder = HoodiePipeline.builder(table)
           .column("uuid VARCHAR(20)")
           .column("content VARCHAR(255)")
           .column("ps VARCHAR(20)")
           .column("ts TIMESTAMP(3)")
           .pk("uuid")
           .partition("ps")
           .options(options)
         builder.sink(source,false)
         env.execute()
   ```
   
   as you can see,I print recodes in map function,but the stoud is empty,so there was no datas processed.
   
   And then,the codes did not throw exceptions,But pending at here:
   ```
   [INFO ] 2023-02-15 10:22:01,098:Registering TaskManager with ResourceID e8ee6bfd-f75d-4486-9b7a-45fdf0befd68 (akka://flink/user/rpc/taskmanager_0) at ResourceManager
   [INFO ] 2023-02-15 10:22:01,100:Successful registration at resource manager akka://flink/user/rpc/resourcemanager_1 under registration id 7f51c4797923db47714b2ca1ab8d84ab.
   [INFO ] 2023-02-15 10:22:01,100:Received JobGraph submission 'Flink Streaming Job' (f9929ff372b45550c0560a998e1a7041).
   [INFO ] 2023-02-15 10:22:01,101:Submitting job 'Flink Streaming Job' (f9929ff372b45550c0560a998e1a7041).
   [INFO ] 2023-02-15 10:22:01,116:Proposing leadership to contender LeaderContender: JobMasterServiceLeadershipRunner
   [INFO ] 2023-02-15 10:22:01,128:Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_3 .
   [INFO ] 2023-02-15 10:22:01,134:Initializing job 'Flink Streaming Job' (f9929ff372b45550c0560a998e1a7041).
   [INFO ] 2023-02-15 10:22:01,154:Using restart back off time strategy NoRestartBackoffTimeStrategy for Flink Streaming Job (f9929ff372b45550c0560a998e1a7041).
   [INFO ] 2023-02-15 10:22:01,196:Running initialization on master for job Flink Streaming Job (f9929ff372b45550c0560a998e1a7041).
   [INFO ] 2023-02-15 10:22:01,196:Successfully ran initialization on master in 0 ms.
   [INFO ] 2023-02-15 10:22:01,292:Built 1 new pipelined regions in 1 ms, total 1 pipelined regions currently.
   [INFO ] 2023-02-15 10:22:01,300:No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@6f4f4b24
   [INFO ] 2023-02-15 10:22:01,300:State backend loader loads the state backend as HashMapStateBackend
   [INFO ] 2023-02-15 10:22:01,303:Checkpoint storage is set to 'jobmanager'
   [INFO ] 2023-02-15 10:22:01,339:No checkpoint found during restore.
   [INFO ] 2023-02-15 10:22:01,346:Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@309d741 for Flink Streaming Job (f9929ff372b45550c0560a998e1a7041).
   [INFO ] 2023-02-15 10:22:01,354:Received confirmation of leadership for leader akka://flink/user/rpc/jobmanager_3 , session=9934221e-e0be-4027-a8ce-975fedab52c5
   [INFO ] 2023-02-15 10:22:01,357:Starting execution of job 'Flink Streaming Job' (f9929ff372b45550c0560a998e1a7041) under job master id a8ce975fedab52c59934221ee0be4027.
   [WARN ] 2023-02-15 10:22:13,961:Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
   [INFO ] 2023-02-15 10:22:13,972:Scheduled Metric snapshot period at 10 second(s).
   [INFO ] 2023-02-15 10:22:13,972:s3a-file-system metrics system started
   ```
   
   I have tried to use Spark to write data like this,it worked well.
   
   So what is problem with my Flink codes.
   
   I didn't found more examples on Hudi doc using Flink datastream api.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.generate the records
   
   2.set congigurations
   
   3.sink to s3
   
   
   **Environment Description**
   
   * Hudi version : 0.12.2
   
   * Flink version :1.15.2
   
   * Hadoop version :3.3.4
   
   * Scala version : 2.12
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   
   **Stacktrace**
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on issue #7953: [SUPPORT]Code pending on writing data to S3 using Flink datastream API,and the target path is empty.

Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua commented on issue #7953:
URL: https://github.com/apache/hudi/issues/7953#issuecomment-1431893619

   @danny0405 could you help here on the Hudi Flink setup?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] SwapnilKhante commented on issue #7953: [SUPPORT]Code pending on writing data to S3 using Flink datastream API,and the target path is empty.

Posted by "SwapnilKhante (via GitHub)" <gi...@apache.org>.
SwapnilKhante commented on issue #7953:
URL: https://github.com/apache/hudi/issues/7953#issuecomment-1519922122

   Could you please share your pom.xml as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on issue #7953: [SUPPORT]Code pending on writing data to S3 using Flink datastream API,and the target path is empty.

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on issue #7953:
URL: https://github.com/apache/hudi/issues/7953#issuecomment-1521153640

   @longjuncai1994 Did you enable the checkpoint? Did you use batch execution mode?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org