You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Jannik Franz (Jira)" <ji...@apache.org> on 2019/10/18 13:36:00 UTC
[jira] [Created] (AIRFLOW-5689) Side-Input in Python3 fails to
pickle class
Jannik Franz created AIRFLOW-5689:
-------------------------------------
Summary: Side-Input in Python3 fails to pickle class
Key: AIRFLOW-5689
URL: https://issues.apache.org/jira/browse/AIRFLOW-5689
Project: Apache Airflow
Issue Type: Bug
Components: gcp
Affects Versions: 1.10.5
Environment: python3,beam 2.16.0
Reporter: Jannik Franz
When running Apache Beam with Python3 on Google Cloud Dataflow Sideinputs don't work.
When testing it in the local/direct runner there seems to be no issue.
{code:java}
class FlattenCustomActions(beam.PTransform):
""" Transforms Facebook Day Actions Only retains actions with custom_conversions
Flattens the actions
Adds custom conversions names using a side input
""" def __init__(self, conversions):
super(FlattenCustomActions, self).__init__()
self.conversions = conversions def expand(self, input_or_inputs):
return (
input_or_inputs
| "FlattenActions" >> beam.ParDo(flatten_filter_actions)
| "AddConversionName" >> beam.Map(add_conversion_name, self.conversions)
)
# ...
# in run():
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options) conversions_output = (
p
| "ReadConversions" >> ReadFromText(known_args.input_conversions, coder=JsonCoder())
| TransformConversionMetadata()
) (
conversions_output
| "WriteConversions"
>> WriteCoerced(
known_args.output_conversions,
known_args.output_type,
schema_path=BIGQUERY_SCHEMA_CONVERSIONS_PATH,
)
) (
p
| ReadFacebookJson(known_args.input, retain_root_fields=True)
| FlattenCustomActions(beam.pvalue.AsList(conversions_output))
| "WriteActions"
>> WriteCoerced(
known_args.output, known_args.output_type, schema_path=BIGQUERY_SCHEMA_ACTIONS_PATH
)
){code}
I receive the following Traceback in Dataflow:
{code:java}
Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 773, in run self._load_main_session(self.local_staging_directory) File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session pickler.load_session(session_file) File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 287, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 410, in load_session module = unpickler.load() File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 474, in find_class return StockUnpickler.find_class(self, module, name) AttributeError: Can't get attribute 'FlattenCustomActions' on <module 'dataflow_worker.start' from '/usr/local/lib/python3.6/site-packages/dataflow_worker/start.py'>
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)