You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "JOHN ERVINE (Jira)" <ji...@apache.org> on 2022/03/01 13:22:00 UTC

[jira] [Created] (FLINK-26427) Streaming File Sink Uploading Smaller Versions Of The Same Part File To S3 (Race Condition)

JOHN ERVINE created FLINK-26427:
-----------------------------------

             Summary: Streaming File Sink Uploading Smaller Versions Of The Same Part File To S3 (Race Condition)
                 Key: FLINK-26427
                 URL: https://issues.apache.org/jira/browse/FLINK-26427
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.13.1
            Reporter: JOHN ERVINE


I'm experiencing some odd behaviour when writing ORC files to S3 using flinks Streaming File Sink.

 

 
{code:java}
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(PARAMETER_TOOL_CONFIG.getInt("flink.checkpoint.frequency.ms"), CheckpointingMode.EXACTLY_ONCE);
env.getConfig().enableObjectReuse();

Properties writerProperties = new Properties();
writerProperties.put("orc.compress", "SNAPPY");

//Order Book Sink
StreamingFileSink<ArmadaRow> orderBookSink = StreamingFileSink
.forBulkFormat(new Path(PARAMETER_TOOL_CONFIG.get("order.book.sink")),
new OrcBulkWriterFactory<>(new OrderBookRowVectorizer(F_MD_ORDER_BOOK_GLOBEX_SCHEMA), writerProperties, new Configuration()))
.withBucketAssigner(new OrderBookBucketingAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();{code}
 

I noticed when running queries during ingest of the data, that my row counts were being decremented as the job progressed. I've had a look at S3 and I can seem multiple versions of the same part file. The example below shows part file 15-7 has two versions. The first file is 20.7mb and the last file that's committed is smaller at 5.1mb. In most cases the current file is normally larger but in my instance there are a few examples in the screenshot below where this is not the case.

 

!https://i.stack.imgur.com/soU4b.png|width=2173,height=603!

 

This looks like a typical race condition or failure to upload commits to S3 successfully because the log below shows two commits for the same file very close together. The last commit is at 20:44 but the last modified date in S3 is at 20:43. I don't see any logs indicating a failure to commit. This is currently a blocker for us.

 

 

{{}}
{code:java}
2022-02-28T20:44:03.526+0000 INFO  APP=${sys:AppID} COMP=${sys:CompID} APPNAME=${sys:AppName} S3Committer:64 - Committing staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7 with MPU ID vVhVRh5XtEDmJNrqBCAp.4vcS34FBGoQQjPsE64kBmhkSJJB8T7ZY9codF994n7FBUquF_ls9oFxwoYPl5ZHfP0rkQgJ7aPmHzlB8omIH2ZFbeFNHbXpYS27U9Gl7LOMcEhlekMog4D2eeYUUjr9oA--

2022-02-28T20:44:03.224+0000 INFO  APP=${sys:AppID} COMP=${sys:CompID} APPNAME=${sys:AppName} S3Committer:64 - Committing staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7 with MPU ID jPnNvBwHtiRBLdDbH6W7duV2Fx1lxsOsPV4IfskMkPygpuVXF9DWsp4xZGxejI8mEVbcrIqF6hC9Tff9IzciK0lMUkTNrXHfRfG3tgkMwbX35T3chbXRN8Tjl0tsUF.oSBhgrGFpKxRxyi3CjRknxA--{code}
{{}}

{{}}

{{ }}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)