You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 21:49:39 UTC
[GitHub] [beam] damccorm opened a new issue, #21187: Python DataflowRunner uses always default app_profile_id when writing to BigTable, when using custom write fn
damccorm opened a new issue, #21187:
URL: https://github.com/apache/beam/issues/21187
There are 2 things:
1. apache_beam.io.gcp.bigtableio.WriteToBigTable has no support for custom App profiles at all
2. i've added support to custom DoFn, its passed correctly and works on DirectRunner, and even shows correct passed params in Dataflow logs, but still uses 'default' app_profile_id.
Its easy to trigger just by passing not-existent app_profile_id: DirectRunner crashes with error, DataflowRunner uses 'default' and crashes if 'default' is multi-cluster routing and/or transactional writes are disabled.
BigTable needs to use single-cluster routing to support transactional writes (read-modify-write, check-and-mutate). Thats why i need to use in 1 case custom app_profile_id.
Custom write func:
```
from datetime import datetime, timezone
import logging
import apache_beam as beam
from apache_beam.metrics
import Metrics
from apache_beam.transforms.display import DisplayDataItem
from google.cloud.bigtable
import Client, row_filters
class BigTableWriteIfNotExistsConditionalFn(beam.DoFn):
def __init__(self,
project_id, instance_id, app_profile_id, table_id, column_family, column: str):
super(BigTableWriteIfNotExistsConditionalFn,
self).__init__()
self.beam_options = {
'project_id': project_id,
'instance_id':
instance_id,
'app_profile_id': app_profile_id,
'table_id': table_id,
'column_family':
column_family,
'column': column,
}
self.table = None
self.written = Metrics.counter(self.__class__,
'Written Row')
def __getstate__(self):
return self.beam_options
def __setstate__(self,
options):
self.beam_options = options
self.table = None
self.written = Metrics.counter(self.__class__,
'Written Row')
def start_bundle(self):
if self.table is None:
client = Client(project=self.beam_options['project_id'])
instance = client.instance(self.beam_options['instance_id'])
# add admin=True param in client
ininitialization and uncomment below
# for profile in instance.list_app_profiles():
# logging.info('Profile
name: %s', profile.name)
# logging.info('Profile desc: %s', profile.description)
# logging.info('Routing
policyt type: %s', profile.routing_policy_type)
# logging.info('Cluster id: %s', profile.cluster_id)
# logging.info('Transactional writes: %s', profile.allow_transactional_writes)
self.table
= instance.table(table_id=self.beam_options['table_id'], app_profile_id=self.beam_options['app_profile_id'])
def process(self, kvmessage):
self.written.inc()
row_key, value = kvmessage
row_filter
= row_filters.RowFilterChain(
filters=[row_filters.FamilyNameRegexFilter(self.beam_options['column_family']),
row_filters.ColumnQualifierRegexFilter(self.beam_options['column']),
])
bt_row = self.table.conditional_row(row_key=row_key, filter_=row_filter)
params = {'column_family_id':
self.beam_options['column_family'], 'column': self.beam_options['column'], 'value': value, 'timestamp':
datetime.fromtimestamp(0, timezone.utc), 'state': False}
bt_row.set_cell(**params)
bt_row.commit()
def finish_bundle(self):
pass
def display_data(self):
return {
'projectId':
DisplayDataItem(
self.beam_options['project_id'], label='Bigtable Project Id'),
'instanceId': DisplayDataItem(
self.beam_options['instance_id'], label='Bigtable Instance
Id'),
'tableId': DisplayDataItem(
self.beam_options['table_id'], label='Bigtable
Table Id')
}
```
It processes Tuple[string, string] messages, where first string is BigTable row_key and second is cell value
Imported from Jira [BEAM-12904](https://issues.apache.org/jira/browse/BEAM-12904). Original Jira may contain additional context.
Reported by: mitgath.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org