You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Irina Sandu <ms...@google.com> on 2021/08/10 13:28:51 UTC

Dataflow job gets stuck

Hi all!
We are trying to run a simple dataflow job which just reads from a GCS
file. The job starts, and runs for approx an hour, after which terminates
with the following error:

Workflow failed. Causes: The Dataflow job appears to be stuck because no
worker activity has been seen in the last 1h. Please check the worker logs
in Stackdriver Logging. You can also get help with Cloud Dataflow at
https://cloud.google.com/dataflow/support.

This is how the pipeline is run:
    pipeline_options = PipelineOptions(argv)
    rec_options = pipeline_options.view_as(RecOptions)
    filename = 'gs://<path-to-file-in-gcs>'

    p = beam.Pipeline(options=pipeline_options)
    _ = (p
         | 'ReadTextFile' >> beam.io.textio.ReadFromText(filename))
    p.run().wait_until_finish()

In our requirements file we only have: apache_beam[gcp]==2.26.0

Do you have any idea why this might happen?

Re: Dataflow job gets stuck

Posted by Irina Sandu <ms...@google.com>.
Hello!

We got it to work just by isolating our pipeline to a separate folder.
My guess is that the wrong setup file was being picked up.

Thanks a lot for responding!

On Tue, Aug 10, 2021 at 6:52 PM Sofia’s World <mm...@gmail.com> wrote:

> Hi
>  the following code works for me - mind you i have amended slightly the
> code.
> few qqq:
> 1 - where are you running it from>?  local pc or GCP console?
> 2 - has it ever ran before?
> 3 - can you show the command line you are using to kick off hte process?
>
> i have built your code using gcloud build, and run it using another yaml
> below is the code i have used. Do you have a github repo you can use so i
> can play round with your code?
>
> hth
>  Marco
>
> import apache_beam as beam
> import argparse
> import logging
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> from datetime import date, datetime
> from apache_beam.options.value_provider import RuntimeValueProvider
> from datetime import date
> import requests
> import urllib
> import requests
>
> EDGAR_QUARTERLY_URL =
> 'gs://mm_dataflow_bucket/inputs/all_data_utilities_df.csv'
>
> def run(argv=None, save_main_session=True):
>
>     parser = argparse.ArgumentParser()
>     known_args, pipeline_args = parser.parse_known_args(argv)
>     pipeline_options = PipelineOptions()
>     pipeline_options.view_as(SetupOptions).save_main_session = True
>     logging.info('starting pipeline..')
>     with beam.Pipeline(options=pipeline_options) as p:
>           (p
>          | 'ReadTextFile'
>  >> beam.io.textio.ReadFromText(EDGAR_QUARTERLY_URL)
>          | 'Log it out' >> beam.Map(logging.info))
>
> if __name__ == '__main__':
>   logging.getLogger().setLevel(logging.INFO)
>   run()
>
>
>
>
>
>
>
>
>
>
>
> On Tue, Aug 10, 2021 at 2:29 PM Irina Sandu <ms...@google.com> wrote:
>
>> Hi all!
>> We are trying to run a simple dataflow job which just reads from a GCS
>> file. The job starts, and runs for approx an hour, after which terminates
>> with the following error:
>>
>> Workflow failed. Causes: The Dataflow job appears to be stuck because no
>> worker activity has been seen in the last 1h. Please check the worker logs
>> in Stackdriver Logging. You can also get help with Cloud Dataflow at
>> https://cloud.google.com/dataflow/support.
>>
>> This is how the pipeline is run:
>>     pipeline_options = PipelineOptions(argv)
>>     rec_options = pipeline_options.view_as(RecOptions)
>>     filename = 'gs://<path-to-file-in-gcs>'
>>
>>     p = beam.Pipeline(options=pipeline_options)
>>     _ = (p
>>          | 'ReadTextFile' >> beam.io.textio.ReadFromText(filename))
>>     p.run().wait_until_finish()
>>
>> In our requirements file we only have: apache_beam[gcp]==2.26.0
>>
>> Do you have any idea why this might happen?
>>
>

Re: Dataflow job gets stuck

Posted by Sofia’s World <mm...@gmail.com>.
Hi
 the following code works for me - mind you i have amended slightly the
code.
few qqq:
1 - where are you running it from>?  local pc or GCP console?
2 - has it ever ran before?
3 - can you show the command line you are using to kick off hte process?

i have built your code using gcloud build, and run it using another yaml
below is the code i have used. Do you have a github repo you can use so i
can play round with your code?

hth
 Marco

import apache_beam as beam
import argparse
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from datetime import date, datetime
from apache_beam.options.value_provider import RuntimeValueProvider
from datetime import date
import requests
import urllib
import requests

EDGAR_QUARTERLY_URL =
'gs://mm_dataflow_bucket/inputs/all_data_utilities_df.csv'

def run(argv=None, save_main_session=True):

    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions()
    pipeline_options.view_as(SetupOptions).save_main_session = True
    logging.info('starting pipeline..')
    with beam.Pipeline(options=pipeline_options) as p:
          (p
         | 'ReadTextFile'
 >> beam.io.textio.ReadFromText(EDGAR_QUARTERLY_URL)
         | 'Log it out' >> beam.Map(logging.info))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()











On Tue, Aug 10, 2021 at 2:29 PM Irina Sandu <ms...@google.com> wrote:

> Hi all!
> We are trying to run a simple dataflow job which just reads from a GCS
> file. The job starts, and runs for approx an hour, after which terminates
> with the following error:
>
> Workflow failed. Causes: The Dataflow job appears to be stuck because no
> worker activity has been seen in the last 1h. Please check the worker logs
> in Stackdriver Logging. You can also get help with Cloud Dataflow at
> https://cloud.google.com/dataflow/support.
>
> This is how the pipeline is run:
>     pipeline_options = PipelineOptions(argv)
>     rec_options = pipeline_options.view_as(RecOptions)
>     filename = 'gs://<path-to-file-in-gcs>'
>
>     p = beam.Pipeline(options=pipeline_options)
>     _ = (p
>          | 'ReadTextFile' >> beam.io.textio.ReadFromText(filename))
>     p.run().wait_until_finish()
>
> In our requirements file we only have: apache_beam[gcp]==2.26.0
>
> Do you have any idea why this might happen?
>