You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Marc Matt <ma...@matt-online.info> on 2019/03/14 10:41:39 UTC

python streaming writing hourly avro files files

Hi,

Getting messages from pubsub and then saving it into hourly or other
interval files on gcs does not work on Cloud Dataflow. The job only writes
the files when I shut down the job. Is this not yet supported for the
Python SDK or am I doing something wrong?

Here is a snippet of my code:

p = beam.Pipeline(options=pipelineoptions)

messages = p | 'Read from topic: ' + topic >>
ReadFromPubSub(topic=input_topic).with_input_types(bytes)

windowed_lines = (
        messages
        | 'decode' >> beam.ParDo(DecodeAvro(), parsed_schema)
        | beam.WindowInto(
                window.FixedWindows(60),
                trigger=AfterWatermark(),
                accumulation_mode=AccumulationMode.DISCARDING
            )
        )

output = windowed_lines | 'write result' >> WriteToAvro(
    file_path_prefix='gs://BUCKET/streaming/tests/',
    shard_name_template=topic.split('.')[0] + '_' + str(uuid.uuid4())
+ '_SSSS-of-NNNN',
    schema=parsed_schema,
    file_name_suffix='.avro',
    num_shards=2)


Thank you for your help,
Marc