You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "crbl1122 (via GitHub)" <gi...@apache.org> on 2024/03/01 08:31:30 UTC

[I] [Bug]: Caused by: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BEAM_LOGICAL. All the inputs have relevant nodes, however the cost is still infinite. [beam]

crbl1122 opened a new issue, #30466:
URL: https://github.com/apache/beam/issues/30466

   ### What happened?
   
   I use SqlTransform component in an Apache Beam pipeline running in DataFlow. 
   If I add one more variable in the SQL query, I get the error:
   
   `
   RuntimeError: org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to convert query 
   ....
   Caused by: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BEAM_LOGICAL. All the inputs have relevant nodes, however the cost is still infinite.`
   
   So, this query is working:
   
   ```
   windowing_query = """SELECT DATE_STR, SUBS_ID, (NUM_1 + NUM_2 + NUM_3) AS TOTAL_COST,
                       AVG(NUM_1) OVER (w ROWS 2 PRECEDING) AS NUM_1_sliding_3M
                       FROM PCOLLECTION 
                       WINDOW w AS (PARTITION BY SUBS_ID ORDER BY DATE_STR)"""
   ```
   
   While, this query is not working:
   
   ```
   windowing_query = """SELECT DATE_STR, SUBS_ID, (NUM_1 + NUM_2 + NUM_3) AS TOTAL_COST,
                       AVG(NUM_1) OVER (w ROWS 2 PRECEDING) AS NUM_1_sliding_3M,
                       AVG(NUM_2) OVER (w ROWS 2 PRECEDING) AS NUM_2_sliding_3M
                       FROM PCOLLECTION 
                       WINDOW w AS (PARTITION BY SUBS_ID ORDER BY DATE_STR)"""
   ```
   The difference between these two queries is only one line:
   `                    AVG(NUM_2) OVER (w ROWS 2 PRECEDING) AS NUM_2_sliding_3M`
   
   Pipeline definition:
   
   ```
      with beam.Pipeline(runner, options=pipeline_options) as pipeline:
           logging.info(f'pipeline_options: {pipeline_options}')
           logging.getLogger().setLevel(logging.INFO)
           
   
           # Preprocess train data
           step = 'train'
           # Read raw train data from BQ
           raw_train_dataset = read_from_bq(pipeline, step, data_size) 
           rows_train_dataset = raw_train_dataset[0] | 'Convert to Rows' >> beam.ParDo(ConvertToRow(data_types))
           
           # Apply the SQL transform
           filtered_rows = rows_train_dataset | SqlTransform(windowing_query)
   ```
   
   Why the SqlTransform does not accept more than one rolling average computation?
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org