You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/10/06 13:34:00 UTC

[jira] [Work logged] (BEAM-12904) Python DataflowRunner uses always default app_profile_id when writing to BigTable, when using custom write fn

     [ https://issues.apache.org/jira/browse/BEAM-12904?focusedWorklogId=660917&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-660917 ]

ASF GitHub Bot logged work on BEAM-12904:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Oct/21 13:33
            Start Date: 06/Oct/21 13:33
    Worklog Time Spent: 10m 
      Work Description: codecov[bot] edited a comment on pull request #15560:
URL: https://github.com/apache/beam/pull/15560#issuecomment-925816779


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15560?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15560](https://codecov.io/gh/apache/beam/pull/15560?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1b33f49) into [master](https://codecov.io/gh/apache/beam/commit/4012a46d3aa7b2a4c628f1352c8b579733c71b41?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4012a46) will **increase** coverage by `0.28%`.
   > The diff coverage is `66.66%`.
   
   > :exclamation: Current head 1b33f49 differs from pull request most recent head 57117a4. Consider uploading reports for the commit 57117a4 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15560/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15560?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15560      +/-   ##
   ==========================================
   + Coverage   83.47%   83.76%   +0.28%     
   ==========================================
     Files         445      444       -1     
     Lines       61074    60337     -737     
   ==========================================
   - Hits        50981    50539     -442     
   + Misses      10093     9798     -295     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15560?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigtableio.py](https://codecov.io/gh/apache/beam/pull/15560/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3RhYmxlaW8ucHk=) | `43.47% <66.66%> (-35.75%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15560/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [...teractive/testing/integration/notebook\_executor.py](https://codecov.io/gh/apache/beam/pull/15560/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS90ZXN0aW5nL2ludGVncmF0aW9uL25vdGVib29rX2V4ZWN1dG9yLnB5) | `28.86% <0.00%> (-1.04%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/external.py](https://codecov.io/gh/apache/beam/pull/15560/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9leHRlcm5hbC5weQ==) | `77.70% <0.00%> (-0.81%)` | :arrow_down: |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/15560/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `90.42% <0.00%> (-0.58%)` | :arrow_down: |
   | [sdks/python/apache\_beam/runners/direct/executor.py](https://codecov.io/gh/apache/beam/pull/15560/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZXhlY3V0b3IucHk=) | `96.46% <0.00%> (-0.55%)` | :arrow_down: |
   | [.../runners/portability/fn\_api\_runner/translations.py](https://codecov.io/gh/apache/beam/pull/15560/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3RyYW5zbGF0aW9ucy5weQ==) | `92.71% <0.00%> (-0.50%)` | :arrow_down: |
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/15560/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `91.39% <0.00%> (-0.33%)` | :arrow_down: |
   | [sdks/python/apache\_beam/typehints/schemas.py](https://codecov.io/gh/apache/beam/pull/15560/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3NjaGVtYXMucHk=) | `94.20% <0.00%> (-0.25%)` | :arrow_down: |
   | ... and [49 more](https://codecov.io/gh/apache/beam/pull/15560/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15560?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15560?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [4012a46...57117a4](https://codecov.io/gh/apache/beam/pull/15560?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 660917)
    Time Spent: 1h 40m  (was: 1.5h)

> Python DataflowRunner uses always default app_profile_id when writing to BigTable, when using custom write fn
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12904
>                 URL: https://issues.apache.org/jira/browse/BEAM-12904
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-py-gcp, runner-dataflow
>    Affects Versions: 2.28.0, 2.32.0
>         Environment: Default Python SDK image for environment is apache/beam_python3.7_sdk:2.32.0
> Using provided Python SDK container image: gcr.io/cloud-dataflow/v1beta3/python37:2.32.0
>            Reporter: Krzysztof Korzeniewski
>            Priority: P2
>              Labels: GCP
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
>  
> There are 2 things:
>  1. apache_beam.io.gcp.bigtableio.WriteToBigTable has no support for custom App profiles at all
>  2. i've added support to custom DoFn, its passed correctly and works on DirectRunner, and even shows correct passed params in Dataflow logs, but still uses 'default' app_profile_id. 
>  Its easy to trigger just by passing not-existent app_profile_id: DirectRunner crashes with error, DataflowRunner uses 'default' and crashes if 'default' is multi-cluster routing and/or transactional writes are disabled.
>  BigTable needs to use single-cluster routing to support transactional writes (read-modify-write, check-and-mutate). Thats why i need to use in 1 case custom app_profile_id.
>  Custom write func:
> {code:java}
> from datetime import datetime, timezone
> import logging
> import apache_beam as beam
> from apache_beam.metrics import Metrics
> from apache_beam.transforms.display import DisplayDataItem
> from google.cloud.bigtable import Client, row_filters
> class BigTableWriteIfNotExistsConditionalFn(beam.DoFn):
>   def __init__(self, project_id, instance_id, app_profile_id, table_id, column_family, column: str):
>     super(BigTableWriteIfNotExistsConditionalFn, self).__init__()
>     self.beam_options = {
>         'project_id': project_id,
>         'instance_id': instance_id,
>         'app_profile_id': app_profile_id,
>         'table_id': table_id,
>         'column_family': column_family,
>         'column': column,
>     }
>     self.table = None
>     self.written = Metrics.counter(self.__class__, 'Written Row')
>   def __getstate__(self):
>     return self.beam_options
>   def __setstate__(self, options):
>     self.beam_options = options
>     self.table = None
>     self.written = Metrics.counter(self.__class__, 'Written Row')
>   def start_bundle(self):
>     if self.table is None:
>       client = Client(project=self.beam_options['project_id'])
>       instance = client.instance(self.beam_options['instance_id'])
>     # add admin=True param in client ininitialization and uncomment below 
>     # for profile in instance.list_app_profiles():
>     #   logging.info('Profile name: %s', profile.name)
>     #   logging.info('Profile desc: %s', profile.description)
>     #   logging.info('Routing policyt type: %s', profile.routing_policy_type)
>     #   logging.info('Cluster id: %s', profile.cluster_id)
>     #   logging.info('Transactional writes: %s', profile.allow_transactional_writes)
>       self.table = instance.table(table_id=self.beam_options['table_id'], app_profile_id=self.beam_options['app_profile_id'])
>   def process(self, kvmessage):
>     self.written.inc()
>     row_key, value = kvmessage
>     row_filter = row_filters.RowFilterChain(
>         filters=[row_filters.FamilyNameRegexFilter(self.beam_options['column_family']),
>                  row_filters.ColumnQualifierRegexFilter(self.beam_options['column']),
>                 ])
>     bt_row = self.table.conditional_row(row_key=row_key, filter_=row_filter)
>     params = {'column_family_id': self.beam_options['column_family'], 'column': self.beam_options['column'], 'value': value, 'timestamp': datetime.fromtimestamp(0, timezone.utc), 'state': False}
>     bt_row.set_cell(**params)
>     bt_row.commit()
>   def finish_bundle(self):
>     pass
>   def display_data(self):
>     return {
>         'projectId': DisplayDataItem(
>             self.beam_options['project_id'], label='Bigtable Project Id'),
>         'instanceId': DisplayDataItem(
>             self.beam_options['instance_id'], label='Bigtable Instance Id'),
>         'tableId': DisplayDataItem(
>             self.beam_options['table_id'], label='Bigtable Table Id')
>     }
> {code}
>  It processes Tuple[string, string] messages, where first string is BigTable row_key and second is cell value



--
This message was sent by Atlassian Jira
(v8.3.4#803005)