You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Krzysztof Korzeniewski (Jira)" <ji...@apache.org> on 2021/09/16 11:39:00 UTC
[jira] [Created] (BEAM-12904) Python DataflowRunner uses always
default app_profile_id when writing to BigTable, when using custom write fn
Krzysztof Korzeniewski created BEAM-12904:
---------------------------------------------
Summary: Python DataflowRunner uses always default app_profile_id when writing to BigTable, when using custom write fn
Key: BEAM-12904
URL: https://issues.apache.org/jira/browse/BEAM-12904
Project: Beam
Issue Type: Bug
Components: io-py-gcp, runner-dataflow
Affects Versions: 2.32.0
Environment: Default Python SDK image for environment is apache/beam_python3.7_sdk:2.32.0
Using provided Python SDK container image: gcr.io/cloud-dataflow/v1beta3/python37:2.32.0
Reporter: Krzysztof Korzeniewski
There are 2 things:
1. apache_beam.io.gcp.bigtableio.WriteToBigTable has no support for App profiles at all
2. i've added support to custom DoFn, its passed correctly and works on DirectRunner, and even shows correct passed params in Dataflow logs, but still uses 'default' app_profile_id.
BigTable needs to use single-cluster routing to support transactional writes (read-modify-write, check-and-mutate). Thats why i need to use in 1 case custom app_profile_id.
Its easy to trigger just by passing not-existent app_profile_id: DirectRunner crashes with error, Dataflow runner uses 'default'.
Custom write func:
{code:java}
class BigTableWriteIfNotExistsConditionalFn(beam.DoFn):
def __init__(self, project_id, instance_id, app_profile_id, table_id, column_family, column: str):
super(BigTableWriteIfNotExistsConditionalFn, self).__init__()
self.beam_options = {
'project_id': project_id,
'instance_id': instance_id,
'app_profile_id': app_profile_id,
'table_id': table_id,
'column_family': column_family,
'column': column,
}
self.table = None
self.written = Metrics.counter(self.__class__, 'Written Row')
def __getstate__(self):
return self.beam_options
def __setstate__(self, options):
self.beam_options = options
self.table = None
self.written = Metrics.counter(self.__class__, 'Written Row')
def start_bundle(self):
if self.table is None:
client = Client(project=self.beam_options['project_id'])
instance = client.instance(self.beam_options['instance_id'])
# add admin=True param in client ininitialization and uncomment below
# for profile in instance.list_app_profiles():
# logging.info('Profile name: %s', profile.name)
# logging.info('Profile desc: %s', profile.description)
# logging.info('Routing policyt type: %s', profile.routing_policy_type)
# logging.info('Cluster id: %s', profile.cluster_id)
# logging.info('Transactional writes: %s', profile.allow_transactional_writes)
self.table = instance.table(table_id=self.beam_options['table_id'], app_profile_id=self.beam_options['app_profile_id'])
def process(self, kvmessage):
self.written.inc()
row_key, value = kvmessage
row_filter = row_filters.RowFilterChain(
filters=[row_filters.FamilyNameRegexFilter(self.beam_options['column_family']),
row_filters.ColumnQualifierRegexFilter(self.beam_options['column']),
])
bt_row = self.table.conditional_row(row_key=row_key, filter_=row_filter)
params = {'column_family_id': self.beam_options['column_family'], 'column': self.beam_options['column'], 'value': value[self.beam_options['column']], 'timestamp': datetime.fromtimestamp(0, timezone.utc)}
bt_row.set_cell(**params)
bt_row.commit()
def finish_bundle(self):
pass
def display_data(self):
return {
'projectId': DisplayDataItem(
self.beam_options['project_id'], label='Bigtable Project Id'),
'instanceId': DisplayDataItem(
self.beam_options['instance_id'], label='Bigtable Instance Id'),
'tableId': DisplayDataItem(
self.beam_options['table_id'], label='Bigtable Table Id')
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)