You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Krzysztof Korzeniewski (Jira)" <ji...@apache.org> on 2021/09/16 12:15:00 UTC

[jira] [Updated] (BEAM-12904) Python DataflowRunner uses always default app_profile_id when writing to BigTable, when using custom write fn

     [ https://issues.apache.org/jira/browse/BEAM-12904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Krzysztof Korzeniewski updated BEAM-12904:
------------------------------------------
    Affects Version/s: 2.28.0

> Python DataflowRunner uses always default app_profile_id when writing to BigTable, when using custom write fn
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12904
>                 URL: https://issues.apache.org/jira/browse/BEAM-12904
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp, runner-dataflow
>    Affects Versions: 2.28.0, 2.32.0
>         Environment: Default Python SDK image for environment is apache/beam_python3.7_sdk:2.32.0
> Using provided Python SDK container image: gcr.io/cloud-dataflow/v1beta3/python37:2.32.0
>            Reporter: Krzysztof Korzeniewski
>            Priority: P0
>
>  
> There are 2 things:
> 1. apache_beam.io.gcp.bigtableio.WriteToBigTable has no support for App profiles at all
> 2. i've added support to custom DoFn, its passed correctly and works on DirectRunner, and even shows correct passed params in Dataflow logs, but still uses 'default' app_profile_id. 
> BigTable needs to use single-cluster routing to support transactional writes (read-modify-write, check-and-mutate). Thats why i need to use in 1 case custom app_profile_id.
> Its easy to trigger just by passing not-existent app_profile_id: DirectRunner crashes with error, Dataflow runner uses 'default'.
> Custom write func:
> {code:java}
> class BigTableWriteIfNotExistsConditionalFn(beam.DoFn):
>   def __init__(self, project_id, instance_id, app_profile_id, table_id, column_family, column: str):
>     super(BigTableWriteIfNotExistsConditionalFn, self).__init__()
>     self.beam_options = {
>         'project_id': project_id,
>         'instance_id': instance_id,
>         'app_profile_id': app_profile_id,
>         'table_id': table_id,
>         'column_family': column_family,
>         'column': column,
>     }
>     self.table = None
>     self.written = Metrics.counter(self.__class__, 'Written Row')
>   def __getstate__(self):
>     return self.beam_options
>   def __setstate__(self, options):
>     self.beam_options = options
>     self.table = None
>     self.written = Metrics.counter(self.__class__, 'Written Row')
>   def start_bundle(self):
>     if self.table is None:
>       client = Client(project=self.beam_options['project_id'])
>       instance = client.instance(self.beam_options['instance_id'])
>     # add admin=True param in client ininitialization and uncomment below 
>     # for profile in instance.list_app_profiles():
>     #   logging.info('Profile name: %s', profile.name)
>     #   logging.info('Profile desc: %s', profile.description)
>     #   logging.info('Routing policyt type: %s', profile.routing_policy_type)
>     #   logging.info('Cluster id: %s', profile.cluster_id)
>     #   logging.info('Transactional writes: %s', profile.allow_transactional_writes)
>       self.table = instance.table(table_id=self.beam_options['table_id'], app_profile_id=self.beam_options['app_profile_id'])
>   def process(self, kvmessage):
>     self.written.inc()
>     row_key, value = kvmessage
>     row_filter = row_filters.RowFilterChain(
>         filters=[row_filters.FamilyNameRegexFilter(self.beam_options['column_family']),
>                  row_filters.ColumnQualifierRegexFilter(self.beam_options['column']),
>                 ])
>     bt_row = self.table.conditional_row(row_key=row_key, filter_=row_filter)
>     params = {'column_family_id': self.beam_options['column_family'], 'column': self.beam_options['column'], 'value': value[self.beam_options['column']], 'timestamp': datetime.fromtimestamp(0, timezone.utc)}
>     bt_row.set_cell(**params)
>     bt_row.commit()
>   def finish_bundle(self):
>     pass
>   def display_data(self):
>     return {
>         'projectId': DisplayDataItem(
>             self.beam_options['project_id'], label='Bigtable Project Id'),
>         'instanceId': DisplayDataItem(
>             self.beam_options['instance_id'], label='Bigtable Instance Id'),
>         'tableId': DisplayDataItem(
>             self.beam_options['table_id'], label='Bigtable Table Id')
>     }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)