You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Krzysztof Korzeniewski (Jira)" <ji...@apache.org> on 2021/09/16 12:15:00 UTC
[jira] [Updated] (BEAM-12904) Python DataflowRunner uses always
default app_profile_id when writing to BigTable, when using custom write fn
[ https://issues.apache.org/jira/browse/BEAM-12904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Krzysztof Korzeniewski updated BEAM-12904:
------------------------------------------
Affects Version/s: 2.28.0
> Python DataflowRunner uses always default app_profile_id when writing to BigTable, when using custom write fn
> -------------------------------------------------------------------------------------------------------------
>
> Key: BEAM-12904
> URL: https://issues.apache.org/jira/browse/BEAM-12904
> Project: Beam
> Issue Type: Bug
> Components: io-py-gcp, runner-dataflow
> Affects Versions: 2.28.0, 2.32.0
> Environment: Default Python SDK image for environment is apache/beam_python3.7_sdk:2.32.0
> Using provided Python SDK container image: gcr.io/cloud-dataflow/v1beta3/python37:2.32.0
> Reporter: Krzysztof Korzeniewski
> Priority: P0
>
>
> There are 2 things:
> 1. apache_beam.io.gcp.bigtableio.WriteToBigTable has no support for App profiles at all
> 2. i've added support to custom DoFn, its passed correctly and works on DirectRunner, and even shows correct passed params in Dataflow logs, but still uses 'default' app_profile_id.
> BigTable needs to use single-cluster routing to support transactional writes (read-modify-write, check-and-mutate). Thats why i need to use in 1 case custom app_profile_id.
> Its easy to trigger just by passing not-existent app_profile_id: DirectRunner crashes with error, Dataflow runner uses 'default'.
> Custom write func:
> {code:java}
> class BigTableWriteIfNotExistsConditionalFn(beam.DoFn):
> def __init__(self, project_id, instance_id, app_profile_id, table_id, column_family, column: str):
> super(BigTableWriteIfNotExistsConditionalFn, self).__init__()
> self.beam_options = {
> 'project_id': project_id,
> 'instance_id': instance_id,
> 'app_profile_id': app_profile_id,
> 'table_id': table_id,
> 'column_family': column_family,
> 'column': column,
> }
> self.table = None
> self.written = Metrics.counter(self.__class__, 'Written Row')
> def __getstate__(self):
> return self.beam_options
> def __setstate__(self, options):
> self.beam_options = options
> self.table = None
> self.written = Metrics.counter(self.__class__, 'Written Row')
> def start_bundle(self):
> if self.table is None:
> client = Client(project=self.beam_options['project_id'])
> instance = client.instance(self.beam_options['instance_id'])
> # add admin=True param in client ininitialization and uncomment below
> # for profile in instance.list_app_profiles():
> # logging.info('Profile name: %s', profile.name)
> # logging.info('Profile desc: %s', profile.description)
> # logging.info('Routing policyt type: %s', profile.routing_policy_type)
> # logging.info('Cluster id: %s', profile.cluster_id)
> # logging.info('Transactional writes: %s', profile.allow_transactional_writes)
> self.table = instance.table(table_id=self.beam_options['table_id'], app_profile_id=self.beam_options['app_profile_id'])
> def process(self, kvmessage):
> self.written.inc()
> row_key, value = kvmessage
> row_filter = row_filters.RowFilterChain(
> filters=[row_filters.FamilyNameRegexFilter(self.beam_options['column_family']),
> row_filters.ColumnQualifierRegexFilter(self.beam_options['column']),
> ])
> bt_row = self.table.conditional_row(row_key=row_key, filter_=row_filter)
> params = {'column_family_id': self.beam_options['column_family'], 'column': self.beam_options['column'], 'value': value[self.beam_options['column']], 'timestamp': datetime.fromtimestamp(0, timezone.utc)}
> bt_row.set_cell(**params)
> bt_row.commit()
> def finish_bundle(self):
> pass
> def display_data(self):
> return {
> 'projectId': DisplayDataItem(
> self.beam_options['project_id'], label='Bigtable Project Id'),
> 'instanceId': DisplayDataItem(
> self.beam_options['instance_id'], label='Bigtable Instance Id'),
> 'tableId': DisplayDataItem(
> self.beam_options['table_id'], label='Bigtable Table Id')
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)