You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by OrielResearch Eila Arich-Landkof <ei...@orielresearch.org> on 2018/02/14 01:21:24 UTC

Fwd: dataflow HDF5 loading pipeline errors

Hello,

Any help will be greatly appreciated!!!

I an using dataflow to process H5 (HDF5 format
<https://support.hdfgroup.org/HDF5/>) file.

The H5 file was uploaded to google storage from: https://amp.pharm.mssm.
edu/archs4/download.html

H5 / HDF5 is an hierarchical data structure to present scientific data


My code:

I have created a setup.py file that is based on juliaset
<https://github.com/apache/beam/blob/5c74022da1bb8f8a822cf3c545f96f2903d175a4/sdks/python/apache_beam/examples/complete/juliaset/setup.py>
 example that was reference in one of the other tickets. my only change
there is the list of packages to install:

REQUIRED_PACKAGES = [
    'numpy',
    'h5py',
    'pandas',
    'tables',
    ]

The pipeline is the following:

import numpy as np
import h5py
import pandas as pd
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class ReadGcsBlobs(beam.DoFn):
    def process(self, element, *args, **kwargs):
        from apache_beam.io.gcp import gcsio
        gcs = gcsio.GcsIO()
        yield (element, gcs.open(element).read())

class H5Preprocess(beam.DoFn):
    def process(self, element):
        logging.info('**********starting to read H5')
        h5py.File(element, 'r')
        logging.info('**********finished reading H5')
        expression = hdf['/data/']['expression']
        logging.info('**********finished reading the expression node')
        np_expression = expression[1:2,1:2]
        logging.info('**********subset the expression to numpy 2x2')
        yield (element, np_expression)

def run(argv=None):
    pipeline_options = PipelineOptions(argv)
    parser = argparse.ArgumentParser(description="read from h5 blog
and write to file")
    logging.info('**********finish with the parser')

    with beam.Pipeline(options=pipeline_options) as p:
            (p
                | 'Initialize' >> beam.Create(['gs://archs4/human_matrix.h5'])
                | 'Read-blobs' >> beam.ParDo(ReadGcsBlobs())
                | 'pre-process' >> beam.ParDo(H5Preprocess())
                | 'write' >> beam.io.WriteToText('gs://archs4/outputData.txt')
            )
    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

the execution command is the following:

python beam_try1.py --job-name beam-try1 --project
orielresearch-188115 --runner DataflowRunner --setup_file ./setup.py
--temp_location=gs://archs4/tmp --staging_location gs://archs4/staging

and the pipeline Error is the following:

(5a4c72cfc5507714): Workflow failed. Causes: (3bde8bf810c652b2):
S04:Initialize/Read+Read-blobs+pre-process+write/Write/WriteImpl/WriteBundles/WriteBundles+write/Write/WriteImpl/Pair+write/Write/WriteImpl/WindowInto(WindowIntoFn)+write/Write/WriteImpl/GroupByKey/Reify+write/Write/WriteImpl/GroupByKey/Write
failed., (7b4a7abb1a692d12): A work item was attempted 4 times without
success. Each time the worker eventually lost contact with the
service. The work item was attempted on:
  beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
  beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
  beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
  beamapp-eila-0213182449-2-02131024-1621-harness-vf4f

Could you please advice what need to be fixed?

Thanks,

Eila


-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Re: dataflow HDF5 loading pipeline errors

Posted by Ahmet Altay <al...@google.com>.
Hi Eila,

The error "work item was attempted 4 times without success" indicates that
some operation is consistently failing. You can find more information in
Dataflow worker logs [1] about the actual error.

I cannot tell for sure without looking at the logs, I suspect your issue is
related to process method of H5Preprocess. Unless you are using
save_main_session flag it will result in a NameError (more on on this [2]).
You can resolve it by importing h5py locally (similar to the way you
imported gcsio above).

For future questions related to getting support on Google Cloud Dataflow
please see their support page [3].

Thank you,
Ahmet

[1]
https://cloud.google.com/dataflow/pipelines/logging#cloud-dataflow-worker-log-example
[2] https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors
[3] https://cloud.google.com/dataflow/support

On Tue, Feb 13, 2018 at 5:21 PM, OrielResearch Eila Arich-Landkof <
eila@orielresearch.org> wrote:

> Hello,
>
> Any help will be greatly appreciated!!!
>
> I an using dataflow to process H5 (HDF5 format
> <https://support.hdfgroup.org/HDF5/>) file.
>
> The H5 file was uploaded to google storage from: https://amp.pharm.mssm.e
> du/archs4/download.html
>
> H5 / HDF5 is an hierarchical data structure to present scientific data
>
>
> My code:
>
> I have created a setup.py file that is based on juliaset
> <https://github.com/apache/beam/blob/5c74022da1bb8f8a822cf3c545f96f2903d175a4/sdks/python/apache_beam/examples/complete/juliaset/setup.py>
>  example that was reference in one of the other tickets. my only change
> there is the list of packages to install:
>
> REQUIRED_PACKAGES = [
>     'numpy',
>     'h5py',
>     'pandas',
>     'tables',
>     ]
>
> The pipeline is the following:
>
> import numpy as np
> import h5py
> import pandas as pd
> import argparse
> import logging
> import re
> import apache_beam as beam
> from apache_beam.io import ReadFromText
> from apache_beam.io import WriteToText
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
>
>
> class ReadGcsBlobs(beam.DoFn):
>     def process(self, element, *args, **kwargs):
>         from apache_beam.io.gcp import gcsio
>         gcs = gcsio.GcsIO()
>         yield (element, gcs.open(element).read())
>
> class H5Preprocess(beam.DoFn):
>     def process(self, element):
>         logging.info('**********starting to read H5')
>         h5py.File(element, 'r')
>         logging.info('**********finished reading H5')
>         expression = hdf['/data/']['expression']
>         logging.info('**********finished reading the expression node')
>         np_expression = expression[1:2,1:2]
>         logging.info('**********subset the expression to numpy 2x2')
>         yield (element, np_expression)
>
> def run(argv=None):
>     pipeline_options = PipelineOptions(argv)
>     parser = argparse.ArgumentParser(description="read from h5 blog and write to file")
>     logging.info('**********finish with the parser')
>
>     with beam.Pipeline(options=pipeline_options) as p:
>             (p
>                 | 'Initialize' >> beam.Create(['gs://archs4/human_matrix.h5'])
>                 | 'Read-blobs' >> beam.ParDo(ReadGcsBlobs())
>                 | 'pre-process' >> beam.ParDo(H5Preprocess())
>                 | 'write' >> beam.io.WriteToText('gs://archs4/outputData.txt')
>             )
>     p.run()
>
> if __name__ == '__main__':
>     logging.getLogger().setLevel(logging.INFO)
>     run()
>
> the execution command is the following:
>
> python beam_try1.py --job-name beam-try1 --project orielresearch-188115 --runner DataflowRunner --setup_file ./setup.py --temp_location=gs://archs4/tmp --staging_location gs://archs4/staging
>
> and the pipeline Error is the following:
>
> (5a4c72cfc5507714): Workflow failed. Causes: (3bde8bf810c652b2): S04:Initialize/Read+Read-blobs+pre-process+write/Write/WriteImpl/WriteBundles/WriteBundles+write/Write/WriteImpl/Pair+write/Write/WriteImpl/WindowInto(WindowIntoFn)+write/Write/WriteImpl/GroupByKey/Reify+write/Write/WriteImpl/GroupByKey/Write failed., (7b4a7abb1a692d12): A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on:
>   beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
>   beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
>   beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
>   beamapp-eila-0213182449-2-02131024-1621-harness-vf4f
>
> Could you please advice what need to be fixed?
>
> Thanks,
>
> Eila
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>
>