You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 18:21:50 UTC

[GitHub] [beam] damccorm opened a new issue, #20632: Writing to BigQuery raises a TypeError "GlobalWindow -> ._IntervalWindowBase"

damccorm opened a new issue, #20632:
URL: https://github.com/apache/beam/issues/20632

   I have a pipeline that reads from a PubSub topic, does some transforms and then writes to BigQuery. The exception message is this: `Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase`
   
   You can find the main part of the pipeline here [https://gist.github.com/PlugaruT/4666406bd8792b7b196dc1519c8885a2](https://gist.github.com/PlugaruT/4666406bd8792b7b196dc1519c8885a2)
   
   The first part is basically reading from Pub/Sub and then I apply this PTranform.
   
   And here is the gigantic stack trace: [https://gist.github.com/PlugaruT/52bf3834eec95fd5cc5779d3332e1433](https://gist.github.com/PlugaruT/52bf3834eec95fd5cc5779d3332e1433)
   
    
   
   I've tried to downgrade to 2.24.0 version but this is still happening.
   
   Also, I can't reproduce the exception locally when I run the pipeline with DirectRunner. It's happening only when running on Dataflow.
   
    
   
    
   
   Imported from Jira [BEAM-11252](https://issues.apache.org/jira/browse/BEAM-11252). Original Jira may contain additional context.
   Reported by: PlugaruT.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Sonins commented on issue #20632: Writing to BigQuery raises a TypeError "GlobalWindow -> ._IntervalWindowBase"

Posted by GitBox <gi...@apache.org>.
Sonins commented on issue #20632:
URL: https://github.com/apache/beam/issues/20632#issuecomment-1248533832

   So if you're having same trouble and want to bypass it, insert `beam.WindowInto(beam.window.GlobalWindows())` somewhere between `beam.WindowInto(beam.window.FixedWindows())` and `beam.io.WriteToBigQuery()`  
   
   For example, for test code I wrote above..
   
   ``` python
   class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
   ...
     def test_bigquery_dead_letter(self):
       runner = DataflowRunner()
   
       self.default_properties.append("--streaming")
   
       with beam.Pipeline(runner=runner, options=PipelineOptions(self.default_properties)) as p:
         
         (
           p
           | beam.io.ReadFromPubSub(topic='projects/project/topics/topic')
           | beam.WindowInto(beam.window.FixedWindows(2))
           | beam.GroupBy(lambda x: x.attributes['table'])
           | "global window" >> beam.WindowInto(beam.window.GlobalWindows()) # Insert this!
           # | beam.FlatMap(lambda x: x) # I think you can't use FlatMap on globalwindowed value
           | beam.Map(lambda x: x)
           | beam.io.WriteToBigQuery('some.table')
   
         )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kubaj commented on issue #20632: Writing to BigQuery raises a TypeError "GlobalWindow -> ._IntervalWindowBase"

Posted by GitBox <gi...@apache.org>.
kubaj commented on issue #20632:
URL: https://github.com/apache/beam/issues/20632#issuecomment-1237252512

   Having the same issue with the `v2.41.0` SDK. Also, as mentioned in the Jira ticket [BEAM-11252](https://issues.apache.org/jira/browse/BEAM-11252), dead-letter pattern cannot be used. 
   
   I'm using DataflowRunner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Sonins commented on issue #20632: Writing to BigQuery raises a TypeError "GlobalWindow -> ._IntervalWindowBase"

Posted by GitBox <gi...@apache.org>.
Sonins commented on issue #20632:
URL: https://github.com/apache/beam/issues/20632#issuecomment-1248529754

   I'm having same problem. And here's what I've investigated so far.
   
   As PlugaruT mentioned, I think the problem occurs because `BigQueryWriteFn` [returns error as GlobalWindows.windowed_value](https://github.com/apache/beam/blob/94405e6c4911669532b3648e91f2f5c5b58e5d26/sdks/python/apache_beam/io/gcp/bigquery.py#L1898-L1910) 
   
   ```python
   ...
       return itertools.chain([
           pvalue.TaggedOutput(
               BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
               GlobalWindows.windowed_value((destination, row, err))) for row,
           err in failed_rows
       ],
                              [
                                  pvalue.TaggedOutput(
                                      BigQueryWriteFn.FAILED_ROWS,
                                      GlobalWindows.windowed_value(
                                          (destination, row))) for row,
                                  unused_err in failed_rows
                              ])
   
   ```
   
   and pipeline thinks its output windowing as fixed size window.
   
   ## Test Code
   If you added certain logging code at [dataflow_runner.py:924](https://github.com/apache/beam/blob/94405e6c4911669532b3648e91f2f5c5b58e5d26/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L924) like below,
   ```python
   def run_ParDo(self, transform_node, options):
       transform = transform_node.transform
       input_tag = transform_node.inputs[0].tag
       input_step = self._cache.get_pvalue(transform_node.inputs[0])
      
       # Logging code start #
       for i in transform_node.outputs:
         _LOGGER.debug("transform_node.output: %s windowing: %s", transform_node.outputs[i], transform_node.outputs[i].windowing)
       # Logging code ends #
   ...
   ```
   
   And add and execute test code to `DataflowRunnerTest` below in [dataflow_runner_test.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py)
   ```python
   class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
   ...
     def test_bigquery_dead_letter(self):
       runner = DataflowRunner()
   
       self.default_properties.append("--streaming")
   
       with beam.Pipeline(runner=runner, options=PipelineOptions(self.default_properties)) as p:
         
         (
           p
           | beam.io.ReadFromPubSub(topic='projects/project/topics/topic')
           | beam.WindowInto(beam.window.FixedWindows(2))
           | beam.GroupBy(lambda x: x.attributes['table'])
           | beam.FlatMap(lambda x: x)
           | beam.io.WriteToBigQuery('some.table')
   
         )
   ```
   
   It will print like below
   
   ```
   pytest apache_beam/runners/dataflow/dataflow_runner_test.py::DataflowRunnerTest::test_bigquery_dead_letter --log-cli-level=DEBUG --full-trace
   ================================================================ test session starts ================================================================
   platform linux -- Python 3.8.14, pytest-7.1.2, pluggy-1.0.0
   rootdir: /home/heegwan/repositories/beam/sdks/python, configfile: pytest.ini
   collected 1 item                                                                                                                                    
   
   apache_beam/runners/dataflow/dataflow_runner_test.py::DataflowRunnerTest::test_bigquery_dead_letter 
   ------------------------------------------------------------------- live log call -------------------------------------------------------------------
   ...
   
   DEBUG    apache_beam.runners.dataflow.dataflow_runner:dataflow_runner.py:930 transform_node.output: PCollection[WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).None] windowing: Windowing(<apache_beam.transforms.window.FixedWindows object at 0x7fb4ee236ac0>, DefaultTrigger(), 1, 1, None)
   DEBUG    apache_beam.runners.dataflow.dataflow_runner:dataflow_runner.py:930 transform_node.output: PCollection[WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] windowing: Windowing(<apache_beam.transforms.window.FixedWindows object at 0x7fb4ee236ac0>, DefaultTrigger(), 1, 1, None)
   DEBUG    apache_beam.runners.dataflow.dataflow_runner:dataflow_runner.py:930 transform_node.output: PCollection[WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRowsWithErrors] windowing: Windowing(<apache_beam.transforms.window.FixedWindows object at 0x7fb4ee236ac0>, DefaultTrigger(), 1, 1, None)
   PASSED
   ```
   
   As you can see, `FixedWindows object` printed out.
   
   Maybe it is normal behavior, then perhaps only dataflow runner having trouble to code `GlobalWindow` as `_IntervalWindowBase` coder.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org