You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Wiśniowski Piotr <co...@gmail.com> on 2023/05/18 09:36:41 UTC

Beam SQL found limitations

HI,

After experimenting with Beam SQL I did find some limitations. Testing 
on near latest main (precisely 
`5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct runner 
and openjdk version "11.0.19". Please let me know if some of them are 
known/ worked on/ have tickets or have estimated fix time. I believe 
most of them are low hanging fruits or just my thinking is not right for 
the problem. If this is the case please guide me to some working solution.

  From my perspective it is ok to have a fix just on master - no need to 
wait for release. Priority order:
- 7. Windowing function on a stream - in detail - How to get previous 
message for a key? setting expiration arbitrary big is ok, but access to 
the previous record must happen fairly quickly not wait for the big 
window to finish and emit the expired keys. Ideally would like to do it 
in pure beam pipeline as saving to some external key/value store and 
then reading this here could potentially result in some race conditions 
which in I would like to avoid, but if its the only option - let it be.
- 5. single UNION ALL possible
- 4. UNNEST ARRAY with nested ROW
- 3. Using * when there is Row type present in the schema
- 1. `CROSS JOIN` between two unrelated tables is not supported - even 
if one is a static number table
- 2. ROW construction not supported. It is not possible to nest data

Below queries tat I use to testing this scenarios.

Thank You for looking at this topics!

Best

Wiśniowski Piotr

-----------------------
-- 1. `CROSS JOIN` between two unrelated tables is not supported.
-----------------------
-- Only supported is `CROSS JOIN UNNEST` when exploding array from same 
table.
-- It is not possible to number rows
WITHdata_table AS(
SELECT1ASa
),
number_table AS(
SELECT
numbers_exploded ASnumber_item
FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) ASnumbers_exploded
)
SELECT
data_table.a,
number_table.number_item
FROMdata_table
CROSS JOINnumber_table
;
-- CROSS JOIN, JOIN ON FALSE is not supported!
-----------------------
-- 2. ROW construction not supported. It is not possible to nest data
-----------------------
SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the same type
SELECTMAP['field1','b','field2','a']; -- null
-- WORKAROUND - manually compose json string,
-- drawback - decomposing might be not supported or would need to be 
also based on string operations
SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}') AS`json_object`;
-----------------------
-- 3. Using * when there is Row type present in the schema
-----------------------
CREATEEXTERNALTABLEtest_tmp_1(
`ref`VARCHAR,
`author`ROW<
`name`VARCHAR,
`email`VARCHAR
>
)
TYPEtext
LOCATION'python/dbt/tests/using_star_limitation.jsonl'
TBLPROPERTIES '{"format":"json", 
"deadLetterFile":"top/python/dbt/tests/dead"}';
SELECT*FROMtest_tmp_1;
-- java.lang.NoSuchFieldException: name
-- WORKAROUND - refer to columns explicitly with alias
SELECT
`ref`ASref_value,
test_tmp_1.`author`.`name`ASauthor_name, -- table name must be 
referenced explicitly - this could be fixed too
test_tmp_1.`author`.`email`ASauthor_name
FROMtest_tmp_1;
-----------------------
-- 4. UNNEST ARRAY with nested ROW
-----------------------
CREATEEXTERNALTABLEtest_tmp(
`ref`VARCHAR,
`commits`ARRAY<ROW<
`id`VARCHAR,
`author`ROW<
`name`VARCHAR,
`email`VARCHAR
>
>>
)
TYPEtext
LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead"}';
SELECT
test_tmp.`ref`ASbranch_name,
commit_item.`id`AScommit_hash,
commit_item.`author`.`name`ASauthor_name
FROMtest_tmp
CROSS JOINUNNEST(test_tmp.commits) AScommit_item;
-- Row expected 4 fields (Field{name=ref, description=, type=STRING, 
options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id 
STRING, author ROW<name STRING, email STRING>> NOT NULL>, options={{}}}, 
Field{name=id, description=, type=STRING, options={{}}}, 
Field{name=author, description=, type=ROW<name STRING, email STRING>, 
options={{}}}). initialized with 5 fields.
-- limited WORKAROUND - refer to array elements by index and UNION ALL 
the items into rows
-- note workaround that uses number table will not work as CROSS JOIN is 
not supported
WITHdata_parsed AS(
SELECT
test_tmp.`ref`ASbranch_id,
test_tmp.commits[1].`id`AScommit_hash,
test_tmp.commits[1].`author`.`name`ASauthor_name
FROMtest_tmp
UNION ALL-- this unfortunately works only for two indexes
SELECT
test_tmp.`ref`ASbranch_id,
test_tmp.commits[2].`id`AScommit_hash,
test_tmp.commits[2].`author`.`name`ASauthor_name
FROMtest_tmp
)
SELECT*
FROMdata_parsed
WHEREauthor_name IS NOT NULL
;
-- better WORKAROUND - but tricky to get right (fragile)
WITHdata_with_number_array AS(
SELECT
test_tmp.`ref`ASbranch_name, -- there must be some primary key in the 
data to join on later due to CROSS JOIN support limitation
ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] ASnumber_array,
CARDINALITY(test_tmp.commits) AScommits_size
FROMtest_tmp
),
data_with_numbers AS(
SELECT
branch_name,
`EXPR$0`ASnumber_item
FROMdata_with_number_array
CROSS JOINUNNEST(data_with_number_array.number_array) ASexploded
WHERE`EXPR$0`<=commits_size
),
data_exploded AS(
SELECT
test_tmp.`ref`ASbranch_name,
test_tmp.commits[data_with_numbers.number_item].`id`AScommit_hash,
test_tmp.commits[data_with_numbers.number_item].`author`.`name`ASauthor_name
FROMtest_tmp
INNER JOINdata_with_numbers
ONdata_with_numbers.branch_name =test_tmp.`ref`
)
SELECT
branch_name,
commit_hash,
author_name
FROMdata_exploded
-- WHERE author_name IS NOT NULL - not possible here due to `Non 
equi-join is not supported`
-- as it pushes this condition as predicate pushdown to join.
-- Is there any way to force checking this condition on here and not to 
project it upstream?
;
-----------------------
-- 5. single UNION ALL possible
-----------------------
SELECT1ASa
UNION ALL
SELECT2ASa
UNION ALL
SELECT3ASa;
-- Wrong number of arguments to BeamUnionRel: 
org.apache.beam.sdk.values.PCollectionList@70f145ac
-----------------------
-- 6. Reserved names
-----------------------
-- json_object
SELECT'{}'ASjson_object;
-- parse failed: Incorrect syntax near the keyword 'AS' at line 1, 
column 13.
-- WORKAROUND SELECT '{}' AS `json_object`
-----------------------
-- 7. Windowing function on stream
-----------------------
-- in detail - How to get previous message for a key?
-- setting expiration arbitrary big is ok, but access to the previous 
record must happen fairly quickly
-- not wait for the big window to finish and emit the expired keys.
-- Ideally would like to do it in pure beam pipeline as saving to some 
external key/value store
-- and then reading this here could potentially result in some race 
conditions which would be hard to debug.
DROPTABLEIFEXISTSunbounded_stream;
CREATEEXTERNALTABLEunbounded_stream(
sequenceBIGINT,
event_time TIMESTAMP
)
TYPE'sequence'
TBLPROPERTIES '{"elementsPerSecond":1}'
;
CREATEEXTERNALTABLEdata_1_bounded(
`sequence_nb`BIGINT,
`sender_login`VARCHAR,
`user_id`VARCHAR
)
TYPEtext
LOCATION'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
TBLPROPERTIES '{"format":"json", 
"deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
;
WITH
test_data_1_unbounded AS(
SELECT
sender_login,
user_id,
event_time
FROMunbounded_stream
INNER JOINdata_1_bounded
ONunbounded_stream.sequence =data_1_bounded.sequence_nb
),
test_data_1_lookbehind AS(
SELECT
sender_login,
LAST_VALUE(user_id) OVERprevious_win ASuser_id
FROMtest_data_1_unbounded
WINDOWprevious_win AS(
PARTITIONBYsender_login
ORDER BYevent_time ASC
ROWSBETWEEN1PRECEDINGAND1PRECEDING
)
)
SELECT*
FROMtest_data_1_lookbehind
LIMIT8
;
-- There are not enough rules to produce a node with desired properties: 
convention=ENUMERABLE. All the inputs have relevant nodes, however the 
cost is still infinite.
-- Root: rel#29:RelSubset#4.ENUMERABLE
-- Original rel:
-- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost = 
{12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
-- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER 
(PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2, 
cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
-- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2, 
cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
-- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0, 
cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
-- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0, 
cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
-- 
-- Sets:
-- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
-- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
-- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]), 
rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
-- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
-- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18), 
rowcount=1.0, cumulative cost={1.7976931348623157E308 rows, 
1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
-- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login, 
VARCHAR user_id)
-- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
-- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]), 
rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
-- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
-- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19), 
rowcount=8.0, cumulative cost={1.7976931348623157E308 rows, 
1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
-- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time, 
BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
-- rel#21:RelSubset#2.NONE, best=null
-- 
rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0, 
$2),joinType=inner), rowcount=1.2, cumulative cost={inf}
-- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1, 
$2]), rowcount=1.2, cumulative cost={inf}
-- 
rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2), 
rowcount=1.2, cumulative cost={inf}


Re: Beam SQL found limitations

Posted by Kenneth Knowles <ke...@apache.org>.
It would be interesting to see a design for this. You'll need to partition
or it won't scale because SQL "OVER" clause is linear and sorted in this
case. Other than that, it should be a pretty straightforward implementation
using state + timers + @RequiresTimeSortedInput. Sorting in any other way
would be a little more work, so I'd start with rejecting ORDER BY clauses
with other columns.

Kenn

On Fri, Jun 9, 2023 at 5:06 AM Wiśniowski Piotr <
contact.wisniowskipiotr@gmail.com> wrote:

> Hi,
>
> BTW just found this on Calcite:
> https://calcite.apache.org/docs/stream.html#sliding-windows
>
> I think this is precisely what I was trying to do with Beam SQL and the
> syntax is also very intuitive.
>
> Could this be added to SQL roadmap? How hard it is for implementation?
>
> Best
>
> Piotr
> On 31.05.2023 20:29, Kenneth Knowles wrote:
>
> 1. Yes, using state is a better fit than Beam windowing. You will want to
> use the new feature of annotating the DoFn with @RequiresTimeSortedInput.
> This will make it so you can be sure you are actually getting the
> "previous" event. They can arrive in any order without this annotation. You
> won't be able to do this in SQL. I don't think Beam SQL has implementations
> of analytic functions that have this ability.
>
> Kenn
>
> On Wed, May 31, 2023 at 4:17 AM Wiśniowski Piotr <
> contact.wisniowskipiotr@gmail.com> wrote:
>
>> Hi Kenn,
>>
>> Thanks for clarification.
>>
>> 1. Just to put an example in front - for every event that comes in I need
>> to find corresponding previous event of same user_id and pass
>> previous_event_timestamp in the current event payload down (and also
>> current event becomes previous event for future events that come in for
>> same user). Question is how to do it with BeamSQL. I am aware that analytic
>> windowing (like last_value over etc.) might not be a way for streaming and
>> I am ok with this - it make sense under the hood just as You mention.
>>
>> The task is to be able to keep a simple state in streaming SQL. What I
>> did come up with is using sliding window to have this state available for
>> each new event that comes in.
>>
>> ```
>>
>> WITH
>> unbounded_stream_initialized AS (
>>     SELECT
>>         user_id,
>>         event_time
>>     FROM unbounded_stream
>>     GROUP BY
>>         user_id,
>>         event_time,
>>         TUMBLE(event_time,INTERVAL '1' SECONDS)
>>     UNION ALL
>>     -- this is needed as first session window by default starts at first
>> element, while here we need to start it in the past
>>     -- so that there is a window that ends just after first real element
>>     SELECT
>>         CAST(0 AS BIGINT) AS user_id,
>>         CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
>>     FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not
>> allow to have GROUP BY just after SELECT
>>     GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL
>> '1' SECONDS)
>> ),
>> test_data_1 AS (
>>     SELECT
>>         user_id,
>>         MAX(event_time) AS prev_event_time,
>>         HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS
>> window_end_at
>>     FROM unbounded_stream_initialized
>>     GROUP BY
>>         user_id,
>>         HOP(
>>             -- first create a sliding window to aggregate state
>>             event_time,
>>             INTERVAL '1' SECONDS,
>>             INTERVAL '7' DAYS -- The idea is to have this quite long
>> compared to interval
>>         )
>> ),
>> test_data_1_lookup AS (
>>     SELECT
>>         user_id,
>>         prev_event_time
>>     FROM test_data_1
>>     GROUP BY
>>         user_id,
>>         -- then re-window into windows suitable for joining main stream
>>         TUMBLE(window_end_at, INTERVAL '1' SECONDS)
>> ),
>> enriched_info AS (
>>     SELECT
>>         unbounded_stream_initialized.event_timestamp AS event_timestamp,
>>         unbounded_stream_initialized.user_id AS user_id,
>>         test_data_1_lookup.prev_event_time AS prev_event_time
>>     FROM unbounded_stream_initialized
>>     LEFT JOIN test_data_1_lookup
>>         ON unbounded_stream_initialized.user_id =
>> test_data_1_lookup.user_id
>> )
>> SELECT
>>     *
>> FROM enriched_info
>>
>> ```
>>
>> The doubt that I have is whether above will not store too much redundant
>> data as `test_data_1` suggests it could duplicate and store each incoming
>> msg into all windows there are in the sliding window definition (might be a
>> lot in this case). Or actually resolving if a message belongs to a window
>> is done later when evaluating `LEFT JOIN`? Target DataFlow. I am still
>> learning Beam so there might be some core thing that I miss to understand
>> how it is processed.
>>
>> 2. Any hints on implementing FirestoreIOTableProvider? just more or less
>> how to do it where to look for important parts etc. It seems we would need
>> this functionality.
>>
>> 3. I will try to report some more interesting findings. If possible
>> please prioritize fixing this ROW error.
>>
>> Best
>>
>> Piotr
>> On 26.05.2023 21:36, Kenneth Knowles wrote:
>>
>> Just want to clarify that Beam's concept of windowing is really an
>> event-time based key, and they are all processed logically simultaneously.
>> SQL's concept of windowing function is to sort rows and process them
>> linearly. They are actually totally different. From your queries it seems
>> you are interested in SQL's windowing functions (aka analytic functions).
>>
>> I am surprised by the problems with rows, since we have used them
>> extensively. Hopefully it is not too hard to fix. Same with the UNION ALL
>> problem.
>>
>> And for the CROSS JOIN it would be a nice feature to allow in some cases
>> it seems. Should not be hard.
>>
>> Thank you for reporting this! If you have time it would be really great
>> to get each of these reproducible problems into GitHub issues, each.
>>
>> Kenn
>>
>> On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <
>> contact.wisniowskipiotr@gmail.com> wrote:
>>
>>> Hi Alexey,
>>>
>>> Thank You for reference to that discussion I do actually have pretty
>>> similar thoughts on what Beam SQL needs.
>>>
>>> Update from my side:
>>>
>>> Actually did find a workaround for issue with windowing function on
>>> stream. It basically boils down to using sliding window to collect and
>>> aggregate the state. But would need an advice if this is actually a cost
>>> efficient method (targeting DataFlow runner). The doubt that I have is that
>>> this sliding window would need to have sliding interval less than 1s and
>>> size more than a week and be feed with quire frequent data. If I do
>>> understand this correctly - it would mean each input row would need to be
>>> duplicated for each window and stored which could be quite significant
>>> storage cost?
>>>
>>> Or actually Beam does not physically duplicate the record but just
>>> tracks to which windows the record currently belongs?
>>>
>>>
>>> And the real issue that BeamSQL needs at the moment in my opinion is
>>> fixing bugs.
>>>
>>> Some bugs that I found that prevent one from using it and would really
>>> appreciate fast fix:
>>>
>>> - UNNEST ARRAY with a nested ROW (described below, created ticket -
>>> https://github.com/apache/beam/issues/26911)
>>>
>>> - PubSub table provider actually requires all table properties to be
>>> there (with null in `timestampAttributeKey` it fails) - which essentially
>>> does not allow one to use pubsub publish timestamp as
>>> `timestampAttributeKey`.
>>>
>>> - its not possible to cast VARCHAR to BYTES. And BYTES is needed for
>>> DataStoreV1TableProvider to provide a key for storage. Also consider
>>> updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
>>> requires VARCHAR instead of BYTES - its even easier in implementation.
>>>
>>> - Any hints on how to implement `FireStoreIOTableProvider`? I am
>>> considering implementing it and contributing depending on my team decision
>>> - but would like to get like idea how hard this task is.
>>>
>>> Will create tickets for the rest of issues when I will have some spare
>>> time.
>>>
>>> Best regards
>>>
>>> Wiśniowski Piotr
>>>
>>>
>>> On 22.05.2023 18:28, Alexey Romanenko wrote:
>>>
>>> Hi Piotr,
>>>
>>> Thanks for details! I cross-post this to dev@ as well since, I guess,
>>> people there can provide more insights on this.
>>>
>>> A while ago, I faced the similar issues trying to run Beam SQL against
>>> TPC-DS benchmark.
>>> We had a discussion around that [1], please, take a look since it can be
>>> helpful.
>>>
>>> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>>>
>>> —
>>> Alexey
>>>
>>> On 18 May 2023, at 11:36, Wiśniowski Piotr
>>> <co...@gmail.com> <co...@gmail.com>
>>> wrote:
>>>
>>> HI,
>>>
>>> After experimenting with Beam SQL I did find some limitations. Testing
>>> on near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`)
>>> with Calcite, direct runner and openjdk version "11.0.19". Please let me
>>> know if some of them are known/ worked on/ have tickets or have estimated
>>> fix time. I believe most of them are low hanging fruits or just my thinking
>>> is not right for the problem. If this is the case please guide me to some
>>> working solution.
>>>
>>>  From my perspective it is ok to have a fix just on master - no need to
>>> wait for release. Priority order:
>>> - 7. Windowing function on a stream - in detail - How to get previous
>>> message for a key? setting expiration arbitrary big is ok, but access to
>>> the previous record must happen fairly quickly not wait for the big window
>>> to finish and emit the expired keys. Ideally would like to do it in pure
>>> beam pipeline as saving to some external key/value store and then reading
>>> this here could potentially result in some race conditions which in I would
>>> like to avoid, but if its the only option - let it be.
>>> - 5. single UNION ALL possible
>>> - 4. UNNEST ARRAY with nested ROW
>>> - 3. Using * when there is Row type present in the schema
>>> - 1. `CROSS JOIN` between two unrelated tables is not supported - even
>>> if one is a static number table
>>> - 2. ROW construction not supported. It is not possible to nest data
>>>
>>> Below queries tat I use to testing this scenarios.
>>>
>>> Thank You for looking at this topics!
>>>
>>> Best
>>>
>>> Wiśniowski Piotr
>>> -----------------------
>>> -- 1. `CROSS JOIN` between two unrelated tables is not supported.
>>> -----------------------
>>> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same
>>> table.
>>> -- It is not possible to number rows
>>> WITH data_table AS (
>>> SELECT 1 AS a
>>> ),
>>> number_table AS (
>>> SELECT
>>> numbers_exploded AS number_item
>>> FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS
>>> numbers_exploded
>>> )
>>> SELECT
>>> data_table.a,
>>> number_table.number_item
>>> FROM data_table
>>> CROSS JOIN number_table
>>> ;
>>> -- CROSS JOIN, JOIN ON FALSE is not supported!
>>> -----------------------
>>> -- 2. ROW construction not supported. It is not possible to nest data
>>> -----------------------
>>> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
>>> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
>>> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same
>>> type
>>> SELECT MAP['field1','b','field2','a']; -- null
>>> -- WORKAROUND - manually compose json string,
>>> -- drawback - decomposing might be not supported or would need to be
>>> also based on string operations
>>> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS
>>> `json_object`;
>>> -----------------------
>>> -- 3. Using * when there is Row type present in the schema
>>> -----------------------
>>> CREATE EXTERNAL TABLE test_tmp_1(
>>> `ref` VARCHAR,
>>> `author` ROW<
>>> `name` VARCHAR,
>>> `email` VARCHAR
>>> >
>>> )
>>> TYPE text
>>> LOCATION 'python/dbt/tests/using_star_limitation.jsonl'
>>> TBLPROPERTIES '{"format":"json",
>>> "deadLetterFile":"top/python/dbt/tests/dead"}';
>>> SELECT * FROM test_tmp_1;
>>> -- java.lang.NoSuchFieldException: name
>>> -- WORKAROUND - refer to columns explicitly with alias
>>> SELECT
>>> `ref` AS ref_value,
>>> test_tmp_1.`author`.`name` AS author_name, -- table name must be
>>> referenced explicitly - this could be fixed too
>>> test_tmp_1.`author`.`email` AS author_name
>>> FROM test_tmp_1;
>>> -----------------------
>>> -- 4. UNNEST ARRAY with nested ROW
>>> -----------------------
>>> CREATE EXTERNAL TABLE test_tmp(
>>> `ref` VARCHAR,
>>> `commits` ARRAY<ROW<
>>> `id` VARCHAR,
>>> `author` ROW<
>>> `name` VARCHAR,
>>> `email` VARCHAR
>>> >
>>> >>
>>> )
>>> TYPE text
>>> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
>>> TBLPROPERTIES '{"format":"json",
>>> "deadLetterFile":"python/dbt/tests/dead"}';
>>> SELECT
>>> test_tmp.`ref` AS branch_name,
>>> commit_item.`id` AS commit_hash,
>>> commit_item.`author`.`name` AS author_name
>>> FROM test_tmp
>>> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item;
>>> -- Row expected 4 fields (Field{name=ref, description=, type=STRING,
>>> options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING,
>>> author ROW<name STRING, email STRING>> NOT NULL>, options={{}}},
>>> Field{name=id, description=, type=STRING, options={{}}}, Field{name=author,
>>> description=, type=ROW<name STRING, email STRING>, options={{}}}).
>>> initialized with 5 fields.
>>> -- limited WORKAROUND - refer to array elements by index and UNION ALL
>>> the items into rows
>>> -- note workaround that uses number table will not work as CROSS JOIN is
>>> not supported
>>> WITH data_parsed AS (
>>> SELECT
>>> test_tmp.`ref` AS branch_id,
>>> test_tmp.commits[1].`id` AS commit_hash,
>>> test_tmp.commits[1].`author`.`name` AS author_name
>>> FROM test_tmp
>>> UNION ALL -- this unfortunately works only for two indexes
>>> SELECT
>>> test_tmp.`ref` AS branch_id,
>>> test_tmp.commits[2].`id` AS commit_hash,
>>> test_tmp.commits[2].`author`.`name` AS author_name
>>> FROM test_tmp
>>> )
>>> SELECT *
>>> FROM data_parsed
>>> WHERE author_name IS NOT NULL
>>> ;
>>> -- better WORKAROUND - but tricky to get right (fragile)
>>> WITH data_with_number_array AS (
>>> SELECT
>>> test_tmp.`ref` AS branch_name, -- there must be some primary key in the
>>> data to join on later due to CROSS JOIN support limitation
>>> ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array,
>>> CARDINALITY(test_tmp.commits) AS commits_size
>>> FROM test_tmp
>>> ),
>>> data_with_numbers AS (
>>> SELECT
>>> branch_name,
>>> `EXPR$0` AS number_item
>>> FROM data_with_number_array
>>> CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded
>>> WHERE `EXPR$0` <= commits_size
>>> ),
>>> data_exploded AS (
>>> SELECT
>>> test_tmp.`ref` AS branch_name,
>>> test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash,
>>> test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS
>>> author_name
>>> FROM test_tmp
>>> INNER JOIN data_with_numbers
>>> ON data_with_numbers.branch_name = test_tmp.`ref`
>>> )
>>> SELECT
>>> branch_name,
>>> commit_hash,
>>> author_name
>>> FROM data_exploded
>>> -- WHERE author_name IS NOT NULL - not possible here due to `Non
>>> equi-join is not supported`
>>> -- as it pushes this condition as predicate pushdown to join.
>>> -- Is there any way to force checking this condition on here and not to
>>> project it upstream?
>>> ;
>>> -----------------------
>>> -- 5. single UNION ALL possible
>>> -----------------------
>>> SELECT 1 AS a
>>> UNION ALL
>>> SELECT 2 AS a
>>> UNION ALL
>>> SELECT 3 AS a;
>>> -- Wrong number of arguments to BeamUnionRel:
>>> org.apache.beam.sdk.values.PCollectionList@70f145ac
>>> -----------------------
>>> -- 6. Reserved names
>>> -----------------------
>>> -- json_object
>>> SELECT '{}' AS json_object;
>>> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1,
>>> column 13.
>>> -- WORKAROUND SELECT '{}' AS `json_object`
>>> -----------------------
>>> -- 7. Windowing function on stream
>>> -----------------------
>>> -- in detail - How to get previous message for a key?
>>> -- setting expiration arbitrary big is ok, but access to the previous
>>> record must happen fairly quickly
>>> -- not wait for the big window to finish and emit the expired keys.
>>> -- Ideally would like to do it in pure beam pipeline as saving to some
>>> external key/value store
>>> -- and then reading this here could potentially result in some race
>>> conditions which would be hard to debug.
>>> DROP TABLE IF EXISTS unbounded_stream;
>>> CREATE EXTERNAL TABLE unbounded_stream(
>>> sequence BIGINT,
>>> event_time TIMESTAMP
>>> )
>>> TYPE 'sequence'
>>> TBLPROPERTIES '{"elementsPerSecond":1}'
>>> ;
>>> CREATE EXTERNAL TABLE data_1_bounded(
>>> `sequence_nb` BIGINT,
>>> `sender_login` VARCHAR,
>>> `user_id` VARCHAR
>>> )
>>> TYPE text
>>> LOCATION
>>> 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
>>> TBLPROPERTIES '{"format":"json",
>>> "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
>>> ;
>>> WITH
>>> test_data_1_unbounded AS (
>>> SELECT
>>> sender_login,
>>> user_id,
>>> event_time
>>> FROM unbounded_stream
>>> INNER JOIN data_1_bounded
>>> ON unbounded_stream.sequence = data_1_bounded.sequence_nb
>>> ),
>>> test_data_1_lookbehind AS (
>>> SELECT
>>> sender_login,
>>> LAST_VALUE(user_id) OVER previous_win AS user_id
>>> FROM test_data_1_unbounded
>>> WINDOW previous_win AS (
>>> PARTITION BY sender_login
>>> ORDER BY event_time ASC
>>> ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
>>> )
>>> )
>>> SELECT *
>>> FROM test_data_1_lookbehind
>>> LIMIT 8
>>> ;
>>> -- There are not enough rules to produce a node with desired properties:
>>> convention=ENUMERABLE. All the inputs have relevant nodes, however the cost
>>> is still infinite.
>>> -- Root: rel#29:RelSubset#4.ENUMERABLE
>>> -- Original rel:
>>> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost =
>>> {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
>>> -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER
>>> (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2,
>>> cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
>>> -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2,
>>> cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
>>> -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0,
>>> cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
>>> -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0,
>>> cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
>>> --
>>> -- Sets:
>>> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
>>> -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
>>> -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]),
>>> rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
>>> -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
>>> -- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
>>> rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
>>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>>> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login,
>>> VARCHAR user_id)
>>> -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
>>> -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]),
>>> rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
>>> -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
>>> -- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
>>> rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
>>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>>> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time,
>>> BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
>>> -- rel#21:RelSubset#2.NONE, best=null
>>> --
>>> rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
>>> $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
>>> -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1,
>>> $2]), rowcount=1.2, cumulative cost={inf}
>>> --
>>> rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
>>> rowcount=1.2, cumulative cost={inf}
>>>
>>>
>>>
>>>

Re: Beam SQL found limitations

Posted by Kenneth Knowles <ke...@apache.org>.
It would be interesting to see a design for this. You'll need to partition
or it won't scale because SQL "OVER" clause is linear and sorted in this
case. Other than that, it should be a pretty straightforward implementation
using state + timers + @RequiresTimeSortedInput. Sorting in any other way
would be a little more work, so I'd start with rejecting ORDER BY clauses
with other columns.

Kenn

On Fri, Jun 9, 2023 at 5:06 AM Wiśniowski Piotr <
contact.wisniowskipiotr@gmail.com> wrote:

> Hi,
>
> BTW just found this on Calcite:
> https://calcite.apache.org/docs/stream.html#sliding-windows
>
> I think this is precisely what I was trying to do with Beam SQL and the
> syntax is also very intuitive.
>
> Could this be added to SQL roadmap? How hard it is for implementation?
>
> Best
>
> Piotr
> On 31.05.2023 20:29, Kenneth Knowles wrote:
>
> 1. Yes, using state is a better fit than Beam windowing. You will want to
> use the new feature of annotating the DoFn with @RequiresTimeSortedInput.
> This will make it so you can be sure you are actually getting the
> "previous" event. They can arrive in any order without this annotation. You
> won't be able to do this in SQL. I don't think Beam SQL has implementations
> of analytic functions that have this ability.
>
> Kenn
>
> On Wed, May 31, 2023 at 4:17 AM Wiśniowski Piotr <
> contact.wisniowskipiotr@gmail.com> wrote:
>
>> Hi Kenn,
>>
>> Thanks for clarification.
>>
>> 1. Just to put an example in front - for every event that comes in I need
>> to find corresponding previous event of same user_id and pass
>> previous_event_timestamp in the current event payload down (and also
>> current event becomes previous event for future events that come in for
>> same user). Question is how to do it with BeamSQL. I am aware that analytic
>> windowing (like last_value over etc.) might not be a way for streaming and
>> I am ok with this - it make sense under the hood just as You mention.
>>
>> The task is to be able to keep a simple state in streaming SQL. What I
>> did come up with is using sliding window to have this state available for
>> each new event that comes in.
>>
>> ```
>>
>> WITH
>> unbounded_stream_initialized AS (
>>     SELECT
>>         user_id,
>>         event_time
>>     FROM unbounded_stream
>>     GROUP BY
>>         user_id,
>>         event_time,
>>         TUMBLE(event_time,INTERVAL '1' SECONDS)
>>     UNION ALL
>>     -- this is needed as first session window by default starts at first
>> element, while here we need to start it in the past
>>     -- so that there is a window that ends just after first real element
>>     SELECT
>>         CAST(0 AS BIGINT) AS user_id,
>>         CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
>>     FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not
>> allow to have GROUP BY just after SELECT
>>     GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL
>> '1' SECONDS)
>> ),
>> test_data_1 AS (
>>     SELECT
>>         user_id,
>>         MAX(event_time) AS prev_event_time,
>>         HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS
>> window_end_at
>>     FROM unbounded_stream_initialized
>>     GROUP BY
>>         user_id,
>>         HOP(
>>             -- first create a sliding window to aggregate state
>>             event_time,
>>             INTERVAL '1' SECONDS,
>>             INTERVAL '7' DAYS -- The idea is to have this quite long
>> compared to interval
>>         )
>> ),
>> test_data_1_lookup AS (
>>     SELECT
>>         user_id,
>>         prev_event_time
>>     FROM test_data_1
>>     GROUP BY
>>         user_id,
>>         -- then re-window into windows suitable for joining main stream
>>         TUMBLE(window_end_at, INTERVAL '1' SECONDS)
>> ),
>> enriched_info AS (
>>     SELECT
>>         unbounded_stream_initialized.event_timestamp AS event_timestamp,
>>         unbounded_stream_initialized.user_id AS user_id,
>>         test_data_1_lookup.prev_event_time AS prev_event_time
>>     FROM unbounded_stream_initialized
>>     LEFT JOIN test_data_1_lookup
>>         ON unbounded_stream_initialized.user_id =
>> test_data_1_lookup.user_id
>> )
>> SELECT
>>     *
>> FROM enriched_info
>>
>> ```
>>
>> The doubt that I have is whether above will not store too much redundant
>> data as `test_data_1` suggests it could duplicate and store each incoming
>> msg into all windows there are in the sliding window definition (might be a
>> lot in this case). Or actually resolving if a message belongs to a window
>> is done later when evaluating `LEFT JOIN`? Target DataFlow. I am still
>> learning Beam so there might be some core thing that I miss to understand
>> how it is processed.
>>
>> 2. Any hints on implementing FirestoreIOTableProvider? just more or less
>> how to do it where to look for important parts etc. It seems we would need
>> this functionality.
>>
>> 3. I will try to report some more interesting findings. If possible
>> please prioritize fixing this ROW error.
>>
>> Best
>>
>> Piotr
>> On 26.05.2023 21:36, Kenneth Knowles wrote:
>>
>> Just want to clarify that Beam's concept of windowing is really an
>> event-time based key, and they are all processed logically simultaneously.
>> SQL's concept of windowing function is to sort rows and process them
>> linearly. They are actually totally different. From your queries it seems
>> you are interested in SQL's windowing functions (aka analytic functions).
>>
>> I am surprised by the problems with rows, since we have used them
>> extensively. Hopefully it is not too hard to fix. Same with the UNION ALL
>> problem.
>>
>> And for the CROSS JOIN it would be a nice feature to allow in some cases
>> it seems. Should not be hard.
>>
>> Thank you for reporting this! If you have time it would be really great
>> to get each of these reproducible problems into GitHub issues, each.
>>
>> Kenn
>>
>> On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <
>> contact.wisniowskipiotr@gmail.com> wrote:
>>
>>> Hi Alexey,
>>>
>>> Thank You for reference to that discussion I do actually have pretty
>>> similar thoughts on what Beam SQL needs.
>>>
>>> Update from my side:
>>>
>>> Actually did find a workaround for issue with windowing function on
>>> stream. It basically boils down to using sliding window to collect and
>>> aggregate the state. But would need an advice if this is actually a cost
>>> efficient method (targeting DataFlow runner). The doubt that I have is that
>>> this sliding window would need to have sliding interval less than 1s and
>>> size more than a week and be feed with quire frequent data. If I do
>>> understand this correctly - it would mean each input row would need to be
>>> duplicated for each window and stored which could be quite significant
>>> storage cost?
>>>
>>> Or actually Beam does not physically duplicate the record but just
>>> tracks to which windows the record currently belongs?
>>>
>>>
>>> And the real issue that BeamSQL needs at the moment in my opinion is
>>> fixing bugs.
>>>
>>> Some bugs that I found that prevent one from using it and would really
>>> appreciate fast fix:
>>>
>>> - UNNEST ARRAY with a nested ROW (described below, created ticket -
>>> https://github.com/apache/beam/issues/26911)
>>>
>>> - PubSub table provider actually requires all table properties to be
>>> there (with null in `timestampAttributeKey` it fails) - which essentially
>>> does not allow one to use pubsub publish timestamp as
>>> `timestampAttributeKey`.
>>>
>>> - its not possible to cast VARCHAR to BYTES. And BYTES is needed for
>>> DataStoreV1TableProvider to provide a key for storage. Also consider
>>> updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
>>> requires VARCHAR instead of BYTES - its even easier in implementation.
>>>
>>> - Any hints on how to implement `FireStoreIOTableProvider`? I am
>>> considering implementing it and contributing depending on my team decision
>>> - but would like to get like idea how hard this task is.
>>>
>>> Will create tickets for the rest of issues when I will have some spare
>>> time.
>>>
>>> Best regards
>>>
>>> Wiśniowski Piotr
>>>
>>>
>>> On 22.05.2023 18:28, Alexey Romanenko wrote:
>>>
>>> Hi Piotr,
>>>
>>> Thanks for details! I cross-post this to dev@ as well since, I guess,
>>> people there can provide more insights on this.
>>>
>>> A while ago, I faced the similar issues trying to run Beam SQL against
>>> TPC-DS benchmark.
>>> We had a discussion around that [1], please, take a look since it can be
>>> helpful.
>>>
>>> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>>>
>>> —
>>> Alexey
>>>
>>> On 18 May 2023, at 11:36, Wiśniowski Piotr
>>> <co...@gmail.com> <co...@gmail.com>
>>> wrote:
>>>
>>> HI,
>>>
>>> After experimenting with Beam SQL I did find some limitations. Testing
>>> on near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`)
>>> with Calcite, direct runner and openjdk version "11.0.19". Please let me
>>> know if some of them are known/ worked on/ have tickets or have estimated
>>> fix time. I believe most of them are low hanging fruits or just my thinking
>>> is not right for the problem. If this is the case please guide me to some
>>> working solution.
>>>
>>>  From my perspective it is ok to have a fix just on master - no need to
>>> wait for release. Priority order:
>>> - 7. Windowing function on a stream - in detail - How to get previous
>>> message for a key? setting expiration arbitrary big is ok, but access to
>>> the previous record must happen fairly quickly not wait for the big window
>>> to finish and emit the expired keys. Ideally would like to do it in pure
>>> beam pipeline as saving to some external key/value store and then reading
>>> this here could potentially result in some race conditions which in I would
>>> like to avoid, but if its the only option - let it be.
>>> - 5. single UNION ALL possible
>>> - 4. UNNEST ARRAY with nested ROW
>>> - 3. Using * when there is Row type present in the schema
>>> - 1. `CROSS JOIN` between two unrelated tables is not supported - even
>>> if one is a static number table
>>> - 2. ROW construction not supported. It is not possible to nest data
>>>
>>> Below queries tat I use to testing this scenarios.
>>>
>>> Thank You for looking at this topics!
>>>
>>> Best
>>>
>>> Wiśniowski Piotr
>>> -----------------------
>>> -- 1. `CROSS JOIN` between two unrelated tables is not supported.
>>> -----------------------
>>> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same
>>> table.
>>> -- It is not possible to number rows
>>> WITH data_table AS (
>>> SELECT 1 AS a
>>> ),
>>> number_table AS (
>>> SELECT
>>> numbers_exploded AS number_item
>>> FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS
>>> numbers_exploded
>>> )
>>> SELECT
>>> data_table.a,
>>> number_table.number_item
>>> FROM data_table
>>> CROSS JOIN number_table
>>> ;
>>> -- CROSS JOIN, JOIN ON FALSE is not supported!
>>> -----------------------
>>> -- 2. ROW construction not supported. It is not possible to nest data
>>> -----------------------
>>> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
>>> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
>>> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same
>>> type
>>> SELECT MAP['field1','b','field2','a']; -- null
>>> -- WORKAROUND - manually compose json string,
>>> -- drawback - decomposing might be not supported or would need to be
>>> also based on string operations
>>> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS
>>> `json_object`;
>>> -----------------------
>>> -- 3. Using * when there is Row type present in the schema
>>> -----------------------
>>> CREATE EXTERNAL TABLE test_tmp_1(
>>> `ref` VARCHAR,
>>> `author` ROW<
>>> `name` VARCHAR,
>>> `email` VARCHAR
>>> >
>>> )
>>> TYPE text
>>> LOCATION 'python/dbt/tests/using_star_limitation.jsonl'
>>> TBLPROPERTIES '{"format":"json",
>>> "deadLetterFile":"top/python/dbt/tests/dead"}';
>>> SELECT * FROM test_tmp_1;
>>> -- java.lang.NoSuchFieldException: name
>>> -- WORKAROUND - refer to columns explicitly with alias
>>> SELECT
>>> `ref` AS ref_value,
>>> test_tmp_1.`author`.`name` AS author_name, -- table name must be
>>> referenced explicitly - this could be fixed too
>>> test_tmp_1.`author`.`email` AS author_name
>>> FROM test_tmp_1;
>>> -----------------------
>>> -- 4. UNNEST ARRAY with nested ROW
>>> -----------------------
>>> CREATE EXTERNAL TABLE test_tmp(
>>> `ref` VARCHAR,
>>> `commits` ARRAY<ROW<
>>> `id` VARCHAR,
>>> `author` ROW<
>>> `name` VARCHAR,
>>> `email` VARCHAR
>>> >
>>> >>
>>> )
>>> TYPE text
>>> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
>>> TBLPROPERTIES '{"format":"json",
>>> "deadLetterFile":"python/dbt/tests/dead"}';
>>> SELECT
>>> test_tmp.`ref` AS branch_name,
>>> commit_item.`id` AS commit_hash,
>>> commit_item.`author`.`name` AS author_name
>>> FROM test_tmp
>>> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item;
>>> -- Row expected 4 fields (Field{name=ref, description=, type=STRING,
>>> options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING,
>>> author ROW<name STRING, email STRING>> NOT NULL>, options={{}}},
>>> Field{name=id, description=, type=STRING, options={{}}}, Field{name=author,
>>> description=, type=ROW<name STRING, email STRING>, options={{}}}).
>>> initialized with 5 fields.
>>> -- limited WORKAROUND - refer to array elements by index and UNION ALL
>>> the items into rows
>>> -- note workaround that uses number table will not work as CROSS JOIN is
>>> not supported
>>> WITH data_parsed AS (
>>> SELECT
>>> test_tmp.`ref` AS branch_id,
>>> test_tmp.commits[1].`id` AS commit_hash,
>>> test_tmp.commits[1].`author`.`name` AS author_name
>>> FROM test_tmp
>>> UNION ALL -- this unfortunately works only for two indexes
>>> SELECT
>>> test_tmp.`ref` AS branch_id,
>>> test_tmp.commits[2].`id` AS commit_hash,
>>> test_tmp.commits[2].`author`.`name` AS author_name
>>> FROM test_tmp
>>> )
>>> SELECT *
>>> FROM data_parsed
>>> WHERE author_name IS NOT NULL
>>> ;
>>> -- better WORKAROUND - but tricky to get right (fragile)
>>> WITH data_with_number_array AS (
>>> SELECT
>>> test_tmp.`ref` AS branch_name, -- there must be some primary key in the
>>> data to join on later due to CROSS JOIN support limitation
>>> ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array,
>>> CARDINALITY(test_tmp.commits) AS commits_size
>>> FROM test_tmp
>>> ),
>>> data_with_numbers AS (
>>> SELECT
>>> branch_name,
>>> `EXPR$0` AS number_item
>>> FROM data_with_number_array
>>> CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded
>>> WHERE `EXPR$0` <= commits_size
>>> ),
>>> data_exploded AS (
>>> SELECT
>>> test_tmp.`ref` AS branch_name,
>>> test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash,
>>> test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS
>>> author_name
>>> FROM test_tmp
>>> INNER JOIN data_with_numbers
>>> ON data_with_numbers.branch_name = test_tmp.`ref`
>>> )
>>> SELECT
>>> branch_name,
>>> commit_hash,
>>> author_name
>>> FROM data_exploded
>>> -- WHERE author_name IS NOT NULL - not possible here due to `Non
>>> equi-join is not supported`
>>> -- as it pushes this condition as predicate pushdown to join.
>>> -- Is there any way to force checking this condition on here and not to
>>> project it upstream?
>>> ;
>>> -----------------------
>>> -- 5. single UNION ALL possible
>>> -----------------------
>>> SELECT 1 AS a
>>> UNION ALL
>>> SELECT 2 AS a
>>> UNION ALL
>>> SELECT 3 AS a;
>>> -- Wrong number of arguments to BeamUnionRel:
>>> org.apache.beam.sdk.values.PCollectionList@70f145ac
>>> -----------------------
>>> -- 6. Reserved names
>>> -----------------------
>>> -- json_object
>>> SELECT '{}' AS json_object;
>>> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1,
>>> column 13.
>>> -- WORKAROUND SELECT '{}' AS `json_object`
>>> -----------------------
>>> -- 7. Windowing function on stream
>>> -----------------------
>>> -- in detail - How to get previous message for a key?
>>> -- setting expiration arbitrary big is ok, but access to the previous
>>> record must happen fairly quickly
>>> -- not wait for the big window to finish and emit the expired keys.
>>> -- Ideally would like to do it in pure beam pipeline as saving to some
>>> external key/value store
>>> -- and then reading this here could potentially result in some race
>>> conditions which would be hard to debug.
>>> DROP TABLE IF EXISTS unbounded_stream;
>>> CREATE EXTERNAL TABLE unbounded_stream(
>>> sequence BIGINT,
>>> event_time TIMESTAMP
>>> )
>>> TYPE 'sequence'
>>> TBLPROPERTIES '{"elementsPerSecond":1}'
>>> ;
>>> CREATE EXTERNAL TABLE data_1_bounded(
>>> `sequence_nb` BIGINT,
>>> `sender_login` VARCHAR,
>>> `user_id` VARCHAR
>>> )
>>> TYPE text
>>> LOCATION
>>> 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
>>> TBLPROPERTIES '{"format":"json",
>>> "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
>>> ;
>>> WITH
>>> test_data_1_unbounded AS (
>>> SELECT
>>> sender_login,
>>> user_id,
>>> event_time
>>> FROM unbounded_stream
>>> INNER JOIN data_1_bounded
>>> ON unbounded_stream.sequence = data_1_bounded.sequence_nb
>>> ),
>>> test_data_1_lookbehind AS (
>>> SELECT
>>> sender_login,
>>> LAST_VALUE(user_id) OVER previous_win AS user_id
>>> FROM test_data_1_unbounded
>>> WINDOW previous_win AS (
>>> PARTITION BY sender_login
>>> ORDER BY event_time ASC
>>> ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
>>> )
>>> )
>>> SELECT *
>>> FROM test_data_1_lookbehind
>>> LIMIT 8
>>> ;
>>> -- There are not enough rules to produce a node with desired properties:
>>> convention=ENUMERABLE. All the inputs have relevant nodes, however the cost
>>> is still infinite.
>>> -- Root: rel#29:RelSubset#4.ENUMERABLE
>>> -- Original rel:
>>> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost =
>>> {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
>>> -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER
>>> (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2,
>>> cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
>>> -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2,
>>> cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
>>> -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0,
>>> cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
>>> -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0,
>>> cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
>>> --
>>> -- Sets:
>>> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
>>> -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
>>> -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]),
>>> rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
>>> -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
>>> -- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
>>> rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
>>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>>> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login,
>>> VARCHAR user_id)
>>> -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
>>> -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]),
>>> rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
>>> -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
>>> -- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
>>> rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
>>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>>> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time,
>>> BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
>>> -- rel#21:RelSubset#2.NONE, best=null
>>> --
>>> rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
>>> $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
>>> -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1,
>>> $2]), rowcount=1.2, cumulative cost={inf}
>>> --
>>> rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
>>> rowcount=1.2, cumulative cost={inf}
>>>
>>>
>>>
>>>

Re: Beam SQL found limitations

Posted by Wiśniowski Piotr <co...@gmail.com>.
Hi,

BTW just found this on Calcite: 
https://calcite.apache.org/docs/stream.html#sliding-windows

I think this is precisely what I was trying to do with Beam SQL and the 
syntax is also very intuitive.

Could this be added to SQL roadmap? How hard it is for implementation?

Best

Piotr

On 31.05.2023 20:29, Kenneth Knowles wrote:
> 1. Yes, using state is a better fit than Beam windowing. You will want 
> to use the new feature of annotating the DoFn 
> with @RequiresTimeSortedInput. This will make it so you can be sure 
> you are actually getting the "previous" event. They can arrive in any 
> order without this annotation. You won't be able to do this in SQL. I 
> don't think Beam SQL has implementations of analytic functions that 
> have this ability.
>
> Kenn
>
> On Wed, May 31, 2023 at 4:17 AM Wiśniowski Piotr 
> <co...@gmail.com> wrote:
>
>     Hi Kenn,
>
>     Thanks for clarification.
>
>     1. Just to put an example in front - for every event that comes in
>     I need to find corresponding previous event of same user_id and
>     pass previous_event_timestamp in the current event payload down
>     (and also current event becomes previous event for future events
>     that come in for same user). Question is how to do it with
>     BeamSQL. I am aware that analytic windowing (like last_value over
>     etc.) might not be a way for streaming and I am ok with this - it
>     make sense under the hood just as You mention.
>
>     The task is to be able to keep a simple state in streaming SQL.
>     What I did come up with is using sliding window to have this state
>     available for each new event that comes in.
>
>     ```
>
>     WITH
>     unbounded_stream_initialized AS (
>         SELECT
>             user_id,
>             event_time
>         FROM unbounded_stream
>         GROUP BY
>             user_id,
>             event_time,
>             TUMBLE(event_time,INTERVAL '1' SECONDS)
>         UNION ALL
>         -- this is needed as first session window by default starts at
>     first element, while here we need to start it in the past
>         -- so that there is a window that ends just after first real
>     element
>         SELECT
>             CAST(0 AS BIGINT) AS user_id,
>             CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
>         FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does
>     not allow to have GROUP BY just after SELECT
>         GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP),
>     INTERVAL '1' SECONDS)
>     ),
>     test_data_1 AS (
>         SELECT
>             user_id,
>             MAX(event_time) AS prev_event_time,
>             HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7'
>     DAYS) AS window_end_at
>         FROM unbounded_stream_initialized
>         GROUP BY
>             user_id,
>             HOP(
>                 -- first create a sliding window to aggregate state
>                 event_time,
>                 INTERVAL '1' SECONDS,
>                 INTERVAL '7' DAYS -- The idea is to have this quite
>     long compared to interval
>             )
>     ),
>     test_data_1_lookup AS (
>         SELECT
>             user_id,
>             prev_event_time
>         FROM test_data_1
>         GROUP BY
>             user_id,
>             -- then re-window into windows suitable for joining main
>     stream
>             TUMBLE(window_end_at, INTERVAL '1' SECONDS)
>     ),
>     enriched_info AS (
>         SELECT
>             unbounded_stream_initialized.event_timestamp AS
>     event_timestamp,
>             unbounded_stream_initialized.user_id AS user_id,
>             test_data_1_lookup.prev_event_time AS prev_event_time
>         FROM unbounded_stream_initialized
>         LEFT JOIN test_data_1_lookup
>             ON unbounded_stream_initialized.user_id =
>     test_data_1_lookup.user_id
>     )
>     SELECT
>         *
>     FROM enriched_info
>
>     ```
>
>     The doubt that I have is whether above will not store too much
>     redundant data as `test_data_1` suggests it could duplicate and
>     store each incoming msg into all windows there are in the sliding
>     window definition (might be a lot in this case). Or actually
>     resolving if a message belongs to a window is done later when
>     evaluating `LEFT JOIN`? Target DataFlow. I am still learning Beam
>     so there might be some core thing that I miss to understand how it
>     is processed.
>
>     2. Any hints on implementing FirestoreIOTableProvider? just more
>     or less how to do it where to look for important parts etc. It
>     seems we would need this functionality.
>
>     3. I will try to report some more interesting findings. If
>     possible please prioritize fixing this ROW error.
>
>     Best
>
>     Piotr
>
>     On 26.05.2023 21:36, Kenneth Knowles wrote:
>>     Just want to clarify that Beam's concept of windowing is really
>>     an event-time based key, and they are all processed logically
>>     simultaneously. SQL's concept of windowing function is to sort
>>     rows and process them linearly. They are actually totally
>>     different. From your queries it seems you are interested in SQL's
>>     windowing functions (aka analytic functions).
>>
>>     I am surprised by the problems with rows, since we have used them
>>     extensively. Hopefully it is not too hard to fix. Same with the
>>     UNION ALL problem.
>>
>>     And for the CROSS JOIN it would be a nice feature to allow in
>>     some cases it seems. Should not be hard.
>>
>>     Thank you for reporting this! If you have time it would be really
>>     great to get each of these reproducible problems into GitHub
>>     issues, each.
>>
>>     Kenn
>>
>>     On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr
>>     <co...@gmail.com> wrote:
>>
>>         Hi Alexey,
>>
>>         Thank You for reference to that discussion I do actually have
>>         pretty similar thoughts on what Beam SQL needs.
>>
>>         Update from my side:
>>
>>         Actually did find a workaround for issue with windowing
>>         function on stream. It basically boils down to using sliding
>>         window to collect and aggregate the state. But would need an
>>         advice if this is actually a cost efficient method (targeting
>>         DataFlow runner). The doubt that I have is that this sliding
>>         window would need to have sliding interval less than 1s and
>>         size more than a week and be feed with quire frequent data.
>>         If I do understand this correctly - it would mean each input
>>         row would need to be duplicated for each window and stored
>>         which could be quite significant storage cost?
>>
>>         Or actually Beam does not physically duplicate the record but
>>         just tracks to which windows the record currently belongs?
>>
>>
>>         And the real issue that BeamSQL needs at the moment in my
>>         opinion is fixing bugs.
>>
>>         Some bugs that I found that prevent one from using it and
>>         would really appreciate fast fix:
>>
>>         - UNNEST ARRAY with a nested ROW (described below, created
>>         ticket - https://github.com/apache/beam/issues/26911)
>>
>>         - PubSub table provider actually requires all table
>>         properties to be there (with null in `timestampAttributeKey`
>>         it fails) - which essentially does not allow one to use
>>         pubsub publish timestamp as `timestampAttributeKey`.
>>
>>         - its not possible to cast VARCHAR to BYTES. And BYTES is
>>         needed for DataStoreV1TableProvider to provide a key for
>>         storage. Also consider updating
>>         `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
>>         requires VARCHAR instead of BYTES - its even easier in
>>         implementation.
>>
>>         - Any hints on how to implement `FireStoreIOTableProvider`? I
>>         am considering implementing it and contributing depending on
>>         my team decision - but would like to get like idea how hard
>>         this task is.
>>
>>         Will create tickets for the rest of issues when I will have
>>         some spare time.
>>
>>         Best regards
>>
>>         Wiśniowski Piotr
>>
>>
>>         On 22.05.2023 18:28, Alexey Romanenko wrote:
>>>         Hi Piotr,
>>>
>>>         Thanks for details! I cross-post this to dev@ as well since,
>>>         I guess, people there can provide more insights on this.
>>>
>>>         A while ago, I faced the similar issues trying to run Beam
>>>         SQL against TPC-DS benchmark.
>>>         We had a discussion around that [1], please, take a look
>>>         since it can be helpful.
>>>
>>>         [1]
>>>         https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>>>
>>>         —
>>>         Alexey
>>>
>>>>         On 18 May 2023, at 11:36, Wiśniowski Piotr
>>>>         <co...@gmail.com>
>>>>         <ma...@gmail.com> wrote:
>>>>
>>>>         HI,
>>>>
>>>>         After experimenting with Beam SQL I did find some
>>>>         limitations. Testing on near latest main (precisely
>>>>         `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite,
>>>>         direct runner and openjdk version "11.0.19". Please let me
>>>>         know if some of them are known/ worked on/ have tickets or
>>>>         have estimated fix time. I believe most of them are low
>>>>         hanging fruits or just my thinking is not right for the
>>>>         problem. If this is the case please guide me to some
>>>>         working solution.
>>>>
>>>>          From my perspective it is ok to have a fix just on master
>>>>         - no need to wait for release. Priority order:
>>>>         - 7. Windowing function on a stream - in detail - How to
>>>>         get previous message for a key? setting expiration
>>>>         arbitrary big is ok, but access to the previous record must
>>>>         happen fairly quickly not wait for the big window to finish
>>>>         and emit the expired keys. Ideally would like to do it in
>>>>         pure beam pipeline as saving to some external key/value
>>>>         store and then reading this here could potentially result
>>>>         in some race conditions which in I would like to avoid, but
>>>>         if its the only option - let it be.
>>>>         - 5. single UNION ALL possible
>>>>         - 4. UNNEST ARRAY with nested ROW
>>>>         - 3. Using * when there is Row type present in the schema
>>>>         - 1. `CROSS JOIN` between two unrelated tables is not
>>>>         supported - even if one is a static number table
>>>>         - 2. ROW construction not supported. It is not possible to
>>>>         nest data
>>>>
>>>>         Below queries tat I use to testing this scenarios.
>>>>
>>>>         Thank You for looking at this topics!
>>>>
>>>>         Best
>>>>
>>>>         Wiśniowski Piotr
>>>>
>>>>         -----------------------
>>>>         -- 1. `CROSS JOIN` between two unrelated tables is not
>>>>         supported.
>>>>         -----------------------
>>>>         -- Only supported is `CROSS JOIN UNNEST` when exploding
>>>>         array from same table.
>>>>         -- It is not possible to number rows
>>>>         WITHdata_table AS(
>>>>         SELECT1ASa
>>>>         ),
>>>>         number_table AS(
>>>>         SELECT
>>>>         numbers_exploded ASnumber_item
>>>>         FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16])
>>>>         ASnumbers_exploded
>>>>         )
>>>>         SELECT
>>>>         data_table.a,
>>>>         number_table.number_item
>>>>         FROMdata_table
>>>>         CROSS JOINnumber_table
>>>>         ;
>>>>         -- CROSS JOIN, JOIN ON FALSE is not supported!
>>>>         -----------------------
>>>>         -- 2. ROW construction not supported. It is not possible to
>>>>         nest data
>>>>         -----------------------
>>>>         SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException:
>>>>         EXPR$0
>>>>         SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
>>>>         SELECTMAP['field1',1,'field2','a']; -- Parameters must be
>>>>         of the same type
>>>>         SELECTMAP['field1','b','field2','a']; -- null
>>>>         -- WORKAROUND - manually compose json string,
>>>>         -- drawback - decomposing might be not supported or would
>>>>         need to be also based on string operations
>>>>         SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}')
>>>>         AS`json_object`;
>>>>         -----------------------
>>>>         -- 3. Using * when there is Row type present in the schema
>>>>         -----------------------
>>>>         CREATEEXTERNALTABLEtest_tmp_1(
>>>>         `ref`VARCHAR,
>>>>         `author`ROW<
>>>>         `name`VARCHAR,
>>>>         `email`VARCHAR
>>>>         >
>>>>         )
>>>>         TYPEtext
>>>>         LOCATION'python/dbt/tests/using_star_limitation.jsonl'
>>>>         TBLPROPERTIES '{"format":"json",
>>>>         "deadLetterFile":"top/python/dbt/tests/dead"}';
>>>>         SELECT*FROMtest_tmp_1;
>>>>         -- java.lang.NoSuchFieldException: name
>>>>         -- WORKAROUND - refer to columns explicitly with alias
>>>>         SELECT
>>>>         `ref`ASref_value,
>>>>         test_tmp_1.`author`.`name`ASauthor_name, -- table name must
>>>>         be referenced explicitly - this could be fixed too
>>>>         test_tmp_1.`author`.`email`ASauthor_name
>>>>         FROMtest_tmp_1;
>>>>         -----------------------
>>>>         -- 4. UNNEST ARRAY with nested ROW
>>>>         -----------------------
>>>>         CREATEEXTERNALTABLEtest_tmp(
>>>>         `ref`VARCHAR,
>>>>         `commits`ARRAY<ROW<
>>>>         `id`VARCHAR,
>>>>         `author`ROW<
>>>>         `name`VARCHAR,
>>>>         `email`VARCHAR
>>>>         >
>>>>         >>
>>>>         )
>>>>         TYPEtext
>>>>         LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
>>>>         TBLPROPERTIES '{"format":"json",
>>>>         "deadLetterFile":"python/dbt/tests/dead"}';
>>>>         SELECT
>>>>         test_tmp.`ref`ASbranch_name,
>>>>         commit_item.`id`AScommit_hash,
>>>>         commit_item.`author`.`name`ASauthor_name
>>>>         FROMtest_tmp
>>>>         CROSS JOINUNNEST(test_tmp.commits) AScommit_item;
>>>>         -- Row expected 4 fields (Field{name=ref, description=,
>>>>         type=STRING, options={{}}}, Field{name=commits,
>>>>         description=, type=ARRAY<ROW<id STRING, author ROW<name
>>>>         STRING, email STRING>> NOT NULL>, options={{}}},
>>>>         Field{name=id, description=, type=STRING, options={{}}},
>>>>         Field{name=author, description=, type=ROW<name STRING,
>>>>         email STRING>, options={{}}}). initialized with 5 fields.
>>>>         -- limited WORKAROUND - refer to array elements by index
>>>>         and UNION ALL the items into rows
>>>>         -- note workaround that uses number table will not work as
>>>>         CROSS JOIN is not supported
>>>>         WITHdata_parsed AS(
>>>>         SELECT
>>>>         test_tmp.`ref`ASbranch_id,
>>>>         test_tmp.commits[1].`id`AScommit_hash,
>>>>         test_tmp.commits[1].`author`.`name`ASauthor_name
>>>>         FROMtest_tmp
>>>>         UNION ALL-- this unfortunately works only for two indexes
>>>>         SELECT
>>>>         test_tmp.`ref`ASbranch_id,
>>>>         test_tmp.commits[2].`id`AScommit_hash,
>>>>         test_tmp.commits[2].`author`.`name`ASauthor_name
>>>>         FROMtest_tmp
>>>>         )
>>>>         SELECT*
>>>>         FROMdata_parsed
>>>>         WHEREauthor_name IS NOT NULL
>>>>         ;
>>>>         -- better WORKAROUND - but tricky to get right (fragile)
>>>>         WITHdata_with_number_array AS(
>>>>         SELECT
>>>>         test_tmp.`ref`ASbranch_name, -- there must be some primary
>>>>         key in the data to join on later due to CROSS JOIN support
>>>>         limitation
>>>>         ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] ASnumber_array,
>>>>         CARDINALITY(test_tmp.commits) AScommits_size
>>>>         FROMtest_tmp
>>>>         ),
>>>>         data_with_numbers AS(
>>>>         SELECT
>>>>         branch_name,
>>>>         `EXPR$0`ASnumber_item
>>>>         FROMdata_with_number_array
>>>>         CROSS JOINUNNEST(data_with_number_array.number_array)
>>>>         ASexploded
>>>>         WHERE`EXPR$0`<=commits_size
>>>>         ),
>>>>         data_exploded AS(
>>>>         SELECT
>>>>         test_tmp.`ref`ASbranch_name,
>>>>         test_tmp.commits[data_with_numbers.number_item].`id`AScommit_hash,
>>>>         test_tmp.commits[data_with_numbers.number_item].`author`.`name`ASauthor_name
>>>>         FROMtest_tmp
>>>>         INNER JOINdata_with_numbers
>>>>         ONdata_with_numbers.branch_name =test_tmp.`ref`
>>>>         )
>>>>         SELECT
>>>>         branch_name,
>>>>         commit_hash,
>>>>         author_name
>>>>         FROMdata_exploded
>>>>         -- WHERE author_name IS NOT NULL - not possible here due to
>>>>         `Non equi-join is not supported`
>>>>         -- as it pushes this condition as predicate pushdown to join.
>>>>         -- Is there any way to force checking this condition on
>>>>         here and not to project it upstream?
>>>>         ;
>>>>         -----------------------
>>>>         -- 5. single UNION ALL possible
>>>>         -----------------------
>>>>         SELECT1ASa
>>>>         UNION ALL
>>>>         SELECT2ASa
>>>>         UNION ALL
>>>>         SELECT3ASa;
>>>>         -- Wrong number of arguments to BeamUnionRel:
>>>>         org.apache.beam.sdk.values.PCollectionList@70f145ac
>>>>         -----------------------
>>>>         -- 6. Reserved names
>>>>         -----------------------
>>>>         -- json_object
>>>>         SELECT'{}'ASjson_object;
>>>>         -- parse failed: Incorrect syntax near the keyword 'AS' at
>>>>         line 1, column 13.
>>>>         -- WORKAROUND SELECT '{}' AS `json_object`
>>>>         -----------------------
>>>>         -- 7. Windowing function on stream
>>>>         -----------------------
>>>>         -- in detail - How to get previous message for a key?
>>>>         -- setting expiration arbitrary big is ok, but access to
>>>>         the previous record must happen fairly quickly
>>>>         -- not wait for the big window to finish and emit the
>>>>         expired keys.
>>>>         -- Ideally would like to do it in pure beam pipeline as
>>>>         saving to some external key/value store
>>>>         -- and then reading this here could potentially result in
>>>>         some race conditions which would be hard to debug.
>>>>         DROPTABLEIFEXISTSunbounded_stream;
>>>>         CREATEEXTERNALTABLEunbounded_stream(
>>>>         sequenceBIGINT,
>>>>         event_time TIMESTAMP
>>>>         )
>>>>         TYPE'sequence'
>>>>         TBLPROPERTIES '{"elementsPerSecond":1}'
>>>>         ;
>>>>         CREATEEXTERNALTABLEdata_1_bounded(
>>>>         `sequence_nb`BIGINT,
>>>>         `sender_login`VARCHAR,
>>>>         `user_id`VARCHAR
>>>>         )
>>>>         TYPEtext
>>>>         LOCATION'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
>>>>         TBLPROPERTIES '{"format":"json",
>>>>         "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
>>>>         ;
>>>>         WITH
>>>>         test_data_1_unbounded AS(
>>>>         SELECT
>>>>         sender_login,
>>>>         user_id,
>>>>         event_time
>>>>         FROMunbounded_stream
>>>>         INNER JOINdata_1_bounded
>>>>         ONunbounded_stream.sequence =data_1_bounded.sequence_nb
>>>>         ),
>>>>         test_data_1_lookbehind AS(
>>>>         SELECT
>>>>         sender_login,
>>>>         LAST_VALUE(user_id) OVERprevious_win ASuser_id
>>>>         FROMtest_data_1_unbounded
>>>>         WINDOWprevious_win AS(
>>>>         PARTITIONBYsender_login
>>>>         ORDER BYevent_time ASC
>>>>         ROWSBETWEEN1PRECEDINGAND1PRECEDING
>>>>         )
>>>>         )
>>>>         SELECT*
>>>>         FROMtest_data_1_lookbehind
>>>>         LIMIT8
>>>>         ;
>>>>         -- There are not enough rules to produce a node with
>>>>         desired properties: convention=ENUMERABLE. All the inputs
>>>>         have relevant nodes, however the cost is still infinite.
>>>>         -- Root: rel#29:RelSubset#4.ENUMERABLE
>>>>         -- Original rel:
>>>>         -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost
>>>>         = {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
>>>>         -- LogicalProject(sender_login=[$3],
>>>>         user_id=[LAST_VALUE($4) OVER (PARTITION BY $3 ORDER BY $1
>>>>         ROWS 1 PRECEDING)]): rowcount = 1.2, cumulative cost =
>>>>         {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
>>>>         -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]):
>>>>         rowcount = 1.2, cumulative cost = {10.2 rows, 9.0 cpu, 9.0
>>>>         io}, id = 12
>>>>         -- BeamIOSourceRel(table=[[beam, unbounded_stream]]):
>>>>         rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 1.0
>>>>         io}, id = 1
>>>>         -- BeamIOSourceRel(table=[[beam, data_1_bounded]]):
>>>>         rowcount = 8.0, cumulative cost = {8.0 rows, 8.0 cpu, 8.0
>>>>         io}, id = 3
>>>>         -- 
>>>>         -- Sets:
>>>>         -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6)
>>>>         event_time)
>>>>         -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
>>>>         -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam,
>>>>         unbounded_stream]), rowcount=1.0, cumulative cost={1.0
>>>>         rows, 1.0 cpu, 1.0 io}
>>>>         -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
>>>>         --
>>>>         rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
>>>>         rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
>>>>         1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>>>>         -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR
>>>>         sender_login, VARCHAR user_id)
>>>>         -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
>>>>         -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam,
>>>>         data_1_bounded]), rowcount=8.0, cumulative cost={8.0 rows,
>>>>         8.0 cpu, 8.0 io}
>>>>         -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
>>>>         --
>>>>         rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
>>>>         rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
>>>>         1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>>>>         -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6)
>>>>         event_time, BIGINT sequence_nb, VARCHAR sender_login,
>>>>         VARCHAR user_id)
>>>>         -- rel#21:RelSubset#2.NONE, best=null
>>>>         --
>>>>         rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
>>>>         $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
>>>>         -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3,
>>>>         $4, $0, $1, $2]), rowcount=1.2, cumulative cost={inf}
>>>>         --
>>>>         rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
>>>>         rowcount=1.2, cumulative cost={inf}
>>>>
>>>>
>>>

Re: Beam SQL found limitations

Posted by Kenneth Knowles <ke...@apache.org>.
1. Yes, using state is a better fit than Beam windowing. You will want to
use the new feature of annotating the DoFn with @RequiresTimeSortedInput.
This will make it so you can be sure you are actually getting the
"previous" event. They can arrive in any order without this annotation. You
won't be able to do this in SQL. I don't think Beam SQL has implementations
of analytic functions that have this ability.

Kenn

On Wed, May 31, 2023 at 4:17 AM Wiśniowski Piotr <
contact.wisniowskipiotr@gmail.com> wrote:

> Hi Kenn,
>
> Thanks for clarification.
>
> 1. Just to put an example in front - for every event that comes in I need
> to find corresponding previous event of same user_id and pass
> previous_event_timestamp in the current event payload down (and also
> current event becomes previous event for future events that come in for
> same user). Question is how to do it with BeamSQL. I am aware that analytic
> windowing (like last_value over etc.) might not be a way for streaming and
> I am ok with this - it make sense under the hood just as You mention.
>
> The task is to be able to keep a simple state in streaming SQL. What I did
> come up with is using sliding window to have this state available for each
> new event that comes in.
>
> ```
>
> WITH
> unbounded_stream_initialized AS (
>     SELECT
>         user_id,
>         event_time
>     FROM unbounded_stream
>     GROUP BY
>         user_id,
>         event_time,
>         TUMBLE(event_time,INTERVAL '1' SECONDS)
>     UNION ALL
>     -- this is needed as first session window by default starts at first
> element, while here we need to start it in the past
>     -- so that there is a window that ends just after first real element
>     SELECT
>         CAST(0 AS BIGINT) AS user_id,
>         CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
>     FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not allow
> to have GROUP BY just after SELECT
>     GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL '1'
> SECONDS)
> ),
> test_data_1 AS (
>     SELECT
>         user_id,
>         MAX(event_time) AS prev_event_time,
>         HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS
> window_end_at
>     FROM unbounded_stream_initialized
>     GROUP BY
>         user_id,
>         HOP(
>             -- first create a sliding window to aggregate state
>             event_time,
>             INTERVAL '1' SECONDS,
>             INTERVAL '7' DAYS -- The idea is to have this quite long
> compared to interval
>         )
> ),
> test_data_1_lookup AS (
>     SELECT
>         user_id,
>         prev_event_time
>     FROM test_data_1
>     GROUP BY
>         user_id,
>         -- then re-window into windows suitable for joining main stream
>         TUMBLE(window_end_at, INTERVAL '1' SECONDS)
> ),
> enriched_info AS (
>     SELECT
>         unbounded_stream_initialized.event_timestamp AS event_timestamp,
>         unbounded_stream_initialized.user_id AS user_id,
>         test_data_1_lookup.prev_event_time AS prev_event_time
>     FROM unbounded_stream_initialized
>     LEFT JOIN test_data_1_lookup
>         ON unbounded_stream_initialized.user_id =
> test_data_1_lookup.user_id
> )
> SELECT
>     *
> FROM enriched_info
>
> ```
>
> The doubt that I have is whether above will not store too much redundant
> data as `test_data_1` suggests it could duplicate and store each incoming
> msg into all windows there are in the sliding window definition (might be a
> lot in this case). Or actually resolving if a message belongs to a window
> is done later when evaluating `LEFT JOIN`? Target DataFlow. I am still
> learning Beam so there might be some core thing that I miss to understand
> how it is processed.
>
> 2. Any hints on implementing FirestoreIOTableProvider? just more or less
> how to do it where to look for important parts etc. It seems we would need
> this functionality.
>
> 3. I will try to report some more interesting findings. If possible please
> prioritize fixing this ROW error.
>
> Best
>
> Piotr
>
> On 26.05.2023 21:36, Kenneth Knowles wrote:
>
> Just want to clarify that Beam's concept of windowing is really an
> event-time based key, and they are all processed logically simultaneously.
> SQL's concept of windowing function is to sort rows and process them
> linearly. They are actually totally different. From your queries it seems
> you are interested in SQL's windowing functions (aka analytic functions).
>
> I am surprised by the problems with rows, since we have used them
> extensively. Hopefully it is not too hard to fix. Same with the UNION ALL
> problem.
>
> And for the CROSS JOIN it would be a nice feature to allow in some cases
> it seems. Should not be hard.
>
> Thank you for reporting this! If you have time it would be really great to
> get each of these reproducible problems into GitHub issues, each.
>
> Kenn
>
> On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <
> contact.wisniowskipiotr@gmail.com> wrote:
>
>> Hi Alexey,
>>
>> Thank You for reference to that discussion I do actually have pretty
>> similar thoughts on what Beam SQL needs.
>>
>> Update from my side:
>>
>> Actually did find a workaround for issue with windowing function on
>> stream. It basically boils down to using sliding window to collect and
>> aggregate the state. But would need an advice if this is actually a cost
>> efficient method (targeting DataFlow runner). The doubt that I have is that
>> this sliding window would need to have sliding interval less than 1s and
>> size more than a week and be feed with quire frequent data. If I do
>> understand this correctly - it would mean each input row would need to be
>> duplicated for each window and stored which could be quite significant
>> storage cost?
>>
>> Or actually Beam does not physically duplicate the record but just tracks
>> to which windows the record currently belongs?
>>
>>
>> And the real issue that BeamSQL needs at the moment in my opinion is
>> fixing bugs.
>>
>> Some bugs that I found that prevent one from using it and would really
>> appreciate fast fix:
>>
>> - UNNEST ARRAY with a nested ROW (described below, created ticket -
>> https://github.com/apache/beam/issues/26911)
>>
>> - PubSub table provider actually requires all table properties to be
>> there (with null in `timestampAttributeKey` it fails) - which essentially
>> does not allow one to use pubsub publish timestamp as
>> `timestampAttributeKey`.
>>
>> - its not possible to cast VARCHAR to BYTES. And BYTES is needed for
>> DataStoreV1TableProvider to provide a key for storage. Also consider
>> updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
>> requires VARCHAR instead of BYTES - its even easier in implementation.
>>
>> - Any hints on how to implement `FireStoreIOTableProvider`? I am
>> considering implementing it and contributing depending on my team decision
>> - but would like to get like idea how hard this task is.
>>
>> Will create tickets for the rest of issues when I will have some spare
>> time.
>>
>> Best regards
>>
>> Wiśniowski Piotr
>>
>>
>> On 22.05.2023 18:28, Alexey Romanenko wrote:
>>
>> Hi Piotr,
>>
>> Thanks for details! I cross-post this to dev@ as well since, I guess,
>> people there can provide more insights on this.
>>
>> A while ago, I faced the similar issues trying to run Beam SQL against
>> TPC-DS benchmark.
>> We had a discussion around that [1], please, take a look since it can be
>> helpful.
>>
>> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>>
>> —
>> Alexey
>>
>> On 18 May 2023, at 11:36, Wiśniowski Piotr
>> <co...@gmail.com> <co...@gmail.com>
>> wrote:
>>
>> HI,
>>
>> After experimenting with Beam SQL I did find some limitations. Testing on
>> near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`)
>> with Calcite, direct runner and openjdk version "11.0.19". Please let me
>> know if some of them are known/ worked on/ have tickets or have estimated
>> fix time. I believe most of them are low hanging fruits or just my thinking
>> is not right for the problem. If this is the case please guide me to some
>> working solution.
>>
>>  From my perspective it is ok to have a fix just on master - no need to
>> wait for release. Priority order:
>> - 7. Windowing function on a stream - in detail - How to get previous
>> message for a key? setting expiration arbitrary big is ok, but access to
>> the previous record must happen fairly quickly not wait for the big window
>> to finish and emit the expired keys. Ideally would like to do it in pure
>> beam pipeline as saving to some external key/value store and then reading
>> this here could potentially result in some race conditions which in I would
>> like to avoid, but if its the only option - let it be.
>> - 5. single UNION ALL possible
>> - 4. UNNEST ARRAY with nested ROW
>> - 3. Using * when there is Row type present in the schema
>> - 1. `CROSS JOIN` between two unrelated tables is not supported - even if
>> one is a static number table
>> - 2. ROW construction not supported. It is not possible to nest data
>>
>> Below queries tat I use to testing this scenarios.
>>
>> Thank You for looking at this topics!
>>
>> Best
>>
>> Wiśniowski Piotr
>> -----------------------
>> -- 1. `CROSS JOIN` between two unrelated tables is not supported.
>> -----------------------
>> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same
>> table.
>> -- It is not possible to number rows
>> WITH data_table AS (
>> SELECT 1 AS a
>> ),
>> number_table AS (
>> SELECT
>> numbers_exploded AS number_item
>> FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS
>> numbers_exploded
>> )
>> SELECT
>> data_table.a,
>> number_table.number_item
>> FROM data_table
>> CROSS JOIN number_table
>> ;
>> -- CROSS JOIN, JOIN ON FALSE is not supported!
>> -----------------------
>> -- 2. ROW construction not supported. It is not possible to nest data
>> -----------------------
>> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
>> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
>> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same
>> type
>> SELECT MAP['field1','b','field2','a']; -- null
>> -- WORKAROUND - manually compose json string,
>> -- drawback - decomposing might be not supported or would need to be also
>> based on string operations
>> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS
>> `json_object`;
>> -----------------------
>> -- 3. Using * when there is Row type present in the schema
>> -----------------------
>> CREATE EXTERNAL TABLE test_tmp_1(
>> `ref` VARCHAR,
>> `author` ROW<
>> `name` VARCHAR,
>> `email` VARCHAR
>> >
>> )
>> TYPE text
>> LOCATION 'python/dbt/tests/using_star_limitation.jsonl'
>> TBLPROPERTIES '{"format":"json",
>> "deadLetterFile":"top/python/dbt/tests/dead"}';
>> SELECT * FROM test_tmp_1;
>> -- java.lang.NoSuchFieldException: name
>> -- WORKAROUND - refer to columns explicitly with alias
>> SELECT
>> `ref` AS ref_value,
>> test_tmp_1.`author`.`name` AS author_name, -- table name must be
>> referenced explicitly - this could be fixed too
>> test_tmp_1.`author`.`email` AS author_name
>> FROM test_tmp_1;
>> -----------------------
>> -- 4. UNNEST ARRAY with nested ROW
>> -----------------------
>> CREATE EXTERNAL TABLE test_tmp(
>> `ref` VARCHAR,
>> `commits` ARRAY<ROW<
>> `id` VARCHAR,
>> `author` ROW<
>> `name` VARCHAR,
>> `email` VARCHAR
>> >
>> >>
>> )
>> TYPE text
>> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
>> TBLPROPERTIES '{"format":"json",
>> "deadLetterFile":"python/dbt/tests/dead"}';
>> SELECT
>> test_tmp.`ref` AS branch_name,
>> commit_item.`id` AS commit_hash,
>> commit_item.`author`.`name` AS author_name
>> FROM test_tmp
>> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item;
>> -- Row expected 4 fields (Field{name=ref, description=, type=STRING,
>> options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING,
>> author ROW<name STRING, email STRING>> NOT NULL>, options={{}}},
>> Field{name=id, description=, type=STRING, options={{}}}, Field{name=author,
>> description=, type=ROW<name STRING, email STRING>, options={{}}}).
>> initialized with 5 fields.
>> -- limited WORKAROUND - refer to array elements by index and UNION ALL
>> the items into rows
>> -- note workaround that uses number table will not work as CROSS JOIN is
>> not supported
>> WITH data_parsed AS (
>> SELECT
>> test_tmp.`ref` AS branch_id,
>> test_tmp.commits[1].`id` AS commit_hash,
>> test_tmp.commits[1].`author`.`name` AS author_name
>> FROM test_tmp
>> UNION ALL -- this unfortunately works only for two indexes
>> SELECT
>> test_tmp.`ref` AS branch_id,
>> test_tmp.commits[2].`id` AS commit_hash,
>> test_tmp.commits[2].`author`.`name` AS author_name
>> FROM test_tmp
>> )
>> SELECT *
>> FROM data_parsed
>> WHERE author_name IS NOT NULL
>> ;
>> -- better WORKAROUND - but tricky to get right (fragile)
>> WITH data_with_number_array AS (
>> SELECT
>> test_tmp.`ref` AS branch_name, -- there must be some primary key in the
>> data to join on later due to CROSS JOIN support limitation
>> ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array,
>> CARDINALITY(test_tmp.commits) AS commits_size
>> FROM test_tmp
>> ),
>> data_with_numbers AS (
>> SELECT
>> branch_name,
>> `EXPR$0` AS number_item
>> FROM data_with_number_array
>> CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded
>> WHERE `EXPR$0` <= commits_size
>> ),
>> data_exploded AS (
>> SELECT
>> test_tmp.`ref` AS branch_name,
>> test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash,
>> test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS
>> author_name
>> FROM test_tmp
>> INNER JOIN data_with_numbers
>> ON data_with_numbers.branch_name = test_tmp.`ref`
>> )
>> SELECT
>> branch_name,
>> commit_hash,
>> author_name
>> FROM data_exploded
>> -- WHERE author_name IS NOT NULL - not possible here due to `Non
>> equi-join is not supported`
>> -- as it pushes this condition as predicate pushdown to join.
>> -- Is there any way to force checking this condition on here and not to
>> project it upstream?
>> ;
>> -----------------------
>> -- 5. single UNION ALL possible
>> -----------------------
>> SELECT 1 AS a
>> UNION ALL
>> SELECT 2 AS a
>> UNION ALL
>> SELECT 3 AS a;
>> -- Wrong number of arguments to BeamUnionRel:
>> org.apache.beam.sdk.values.PCollectionList@70f145ac
>> -----------------------
>> -- 6. Reserved names
>> -----------------------
>> -- json_object
>> SELECT '{}' AS json_object;
>> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1, column
>> 13.
>> -- WORKAROUND SELECT '{}' AS `json_object`
>> -----------------------
>> -- 7. Windowing function on stream
>> -----------------------
>> -- in detail - How to get previous message for a key?
>> -- setting expiration arbitrary big is ok, but access to the previous
>> record must happen fairly quickly
>> -- not wait for the big window to finish and emit the expired keys.
>> -- Ideally would like to do it in pure beam pipeline as saving to some
>> external key/value store
>> -- and then reading this here could potentially result in some race
>> conditions which would be hard to debug.
>> DROP TABLE IF EXISTS unbounded_stream;
>> CREATE EXTERNAL TABLE unbounded_stream(
>> sequence BIGINT,
>> event_time TIMESTAMP
>> )
>> TYPE 'sequence'
>> TBLPROPERTIES '{"elementsPerSecond":1}'
>> ;
>> CREATE EXTERNAL TABLE data_1_bounded(
>> `sequence_nb` BIGINT,
>> `sender_login` VARCHAR,
>> `user_id` VARCHAR
>> )
>> TYPE text
>> LOCATION
>> 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
>> TBLPROPERTIES '{"format":"json",
>> "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
>> ;
>> WITH
>> test_data_1_unbounded AS (
>> SELECT
>> sender_login,
>> user_id,
>> event_time
>> FROM unbounded_stream
>> INNER JOIN data_1_bounded
>> ON unbounded_stream.sequence = data_1_bounded.sequence_nb
>> ),
>> test_data_1_lookbehind AS (
>> SELECT
>> sender_login,
>> LAST_VALUE(user_id) OVER previous_win AS user_id
>> FROM test_data_1_unbounded
>> WINDOW previous_win AS (
>> PARTITION BY sender_login
>> ORDER BY event_time ASC
>> ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
>> )
>> )
>> SELECT *
>> FROM test_data_1_lookbehind
>> LIMIT 8
>> ;
>> -- There are not enough rules to produce a node with desired properties:
>> convention=ENUMERABLE. All the inputs have relevant nodes, however the cost
>> is still infinite.
>> -- Root: rel#29:RelSubset#4.ENUMERABLE
>> -- Original rel:
>> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost =
>> {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
>> -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER
>> (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2,
>> cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
>> -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2,
>> cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
>> -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0,
>> cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
>> -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0,
>> cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
>> --
>> -- Sets:
>> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
>> -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
>> -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]),
>> rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
>> -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
>> -- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
>> rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login,
>> VARCHAR user_id)
>> -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
>> -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]),
>> rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
>> -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
>> -- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
>> rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time,
>> BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
>> -- rel#21:RelSubset#2.NONE, best=null
>> --
>> rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
>> $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
>> -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1,
>> $2]), rowcount=1.2, cumulative cost={inf}
>> --
>> rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
>> rowcount=1.2, cumulative cost={inf}
>>
>>
>>
>>

Re: Beam SQL found limitations

Posted by Kenneth Knowles <ke...@apache.org>.
1. Yes, using state is a better fit than Beam windowing. You will want to
use the new feature of annotating the DoFn with @RequiresTimeSortedInput.
This will make it so you can be sure you are actually getting the
"previous" event. They can arrive in any order without this annotation. You
won't be able to do this in SQL. I don't think Beam SQL has implementations
of analytic functions that have this ability.

Kenn

On Wed, May 31, 2023 at 4:17 AM Wiśniowski Piotr <
contact.wisniowskipiotr@gmail.com> wrote:

> Hi Kenn,
>
> Thanks for clarification.
>
> 1. Just to put an example in front - for every event that comes in I need
> to find corresponding previous event of same user_id and pass
> previous_event_timestamp in the current event payload down (and also
> current event becomes previous event for future events that come in for
> same user). Question is how to do it with BeamSQL. I am aware that analytic
> windowing (like last_value over etc.) might not be a way for streaming and
> I am ok with this - it make sense under the hood just as You mention.
>
> The task is to be able to keep a simple state in streaming SQL. What I did
> come up with is using sliding window to have this state available for each
> new event that comes in.
>
> ```
>
> WITH
> unbounded_stream_initialized AS (
>     SELECT
>         user_id,
>         event_time
>     FROM unbounded_stream
>     GROUP BY
>         user_id,
>         event_time,
>         TUMBLE(event_time,INTERVAL '1' SECONDS)
>     UNION ALL
>     -- this is needed as first session window by default starts at first
> element, while here we need to start it in the past
>     -- so that there is a window that ends just after first real element
>     SELECT
>         CAST(0 AS BIGINT) AS user_id,
>         CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
>     FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not allow
> to have GROUP BY just after SELECT
>     GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL '1'
> SECONDS)
> ),
> test_data_1 AS (
>     SELECT
>         user_id,
>         MAX(event_time) AS prev_event_time,
>         HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS
> window_end_at
>     FROM unbounded_stream_initialized
>     GROUP BY
>         user_id,
>         HOP(
>             -- first create a sliding window to aggregate state
>             event_time,
>             INTERVAL '1' SECONDS,
>             INTERVAL '7' DAYS -- The idea is to have this quite long
> compared to interval
>         )
> ),
> test_data_1_lookup AS (
>     SELECT
>         user_id,
>         prev_event_time
>     FROM test_data_1
>     GROUP BY
>         user_id,
>         -- then re-window into windows suitable for joining main stream
>         TUMBLE(window_end_at, INTERVAL '1' SECONDS)
> ),
> enriched_info AS (
>     SELECT
>         unbounded_stream_initialized.event_timestamp AS event_timestamp,
>         unbounded_stream_initialized.user_id AS user_id,
>         test_data_1_lookup.prev_event_time AS prev_event_time
>     FROM unbounded_stream_initialized
>     LEFT JOIN test_data_1_lookup
>         ON unbounded_stream_initialized.user_id =
> test_data_1_lookup.user_id
> )
> SELECT
>     *
> FROM enriched_info
>
> ```
>
> The doubt that I have is whether above will not store too much redundant
> data as `test_data_1` suggests it could duplicate and store each incoming
> msg into all windows there are in the sliding window definition (might be a
> lot in this case). Or actually resolving if a message belongs to a window
> is done later when evaluating `LEFT JOIN`? Target DataFlow. I am still
> learning Beam so there might be some core thing that I miss to understand
> how it is processed.
>
> 2. Any hints on implementing FirestoreIOTableProvider? just more or less
> how to do it where to look for important parts etc. It seems we would need
> this functionality.
>
> 3. I will try to report some more interesting findings. If possible please
> prioritize fixing this ROW error.
>
> Best
>
> Piotr
>
> On 26.05.2023 21:36, Kenneth Knowles wrote:
>
> Just want to clarify that Beam's concept of windowing is really an
> event-time based key, and they are all processed logically simultaneously.
> SQL's concept of windowing function is to sort rows and process them
> linearly. They are actually totally different. From your queries it seems
> you are interested in SQL's windowing functions (aka analytic functions).
>
> I am surprised by the problems with rows, since we have used them
> extensively. Hopefully it is not too hard to fix. Same with the UNION ALL
> problem.
>
> And for the CROSS JOIN it would be a nice feature to allow in some cases
> it seems. Should not be hard.
>
> Thank you for reporting this! If you have time it would be really great to
> get each of these reproducible problems into GitHub issues, each.
>
> Kenn
>
> On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <
> contact.wisniowskipiotr@gmail.com> wrote:
>
>> Hi Alexey,
>>
>> Thank You for reference to that discussion I do actually have pretty
>> similar thoughts on what Beam SQL needs.
>>
>> Update from my side:
>>
>> Actually did find a workaround for issue with windowing function on
>> stream. It basically boils down to using sliding window to collect and
>> aggregate the state. But would need an advice if this is actually a cost
>> efficient method (targeting DataFlow runner). The doubt that I have is that
>> this sliding window would need to have sliding interval less than 1s and
>> size more than a week and be feed with quire frequent data. If I do
>> understand this correctly - it would mean each input row would need to be
>> duplicated for each window and stored which could be quite significant
>> storage cost?
>>
>> Or actually Beam does not physically duplicate the record but just tracks
>> to which windows the record currently belongs?
>>
>>
>> And the real issue that BeamSQL needs at the moment in my opinion is
>> fixing bugs.
>>
>> Some bugs that I found that prevent one from using it and would really
>> appreciate fast fix:
>>
>> - UNNEST ARRAY with a nested ROW (described below, created ticket -
>> https://github.com/apache/beam/issues/26911)
>>
>> - PubSub table provider actually requires all table properties to be
>> there (with null in `timestampAttributeKey` it fails) - which essentially
>> does not allow one to use pubsub publish timestamp as
>> `timestampAttributeKey`.
>>
>> - its not possible to cast VARCHAR to BYTES. And BYTES is needed for
>> DataStoreV1TableProvider to provide a key for storage. Also consider
>> updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
>> requires VARCHAR instead of BYTES - its even easier in implementation.
>>
>> - Any hints on how to implement `FireStoreIOTableProvider`? I am
>> considering implementing it and contributing depending on my team decision
>> - but would like to get like idea how hard this task is.
>>
>> Will create tickets for the rest of issues when I will have some spare
>> time.
>>
>> Best regards
>>
>> Wiśniowski Piotr
>>
>>
>> On 22.05.2023 18:28, Alexey Romanenko wrote:
>>
>> Hi Piotr,
>>
>> Thanks for details! I cross-post this to dev@ as well since, I guess,
>> people there can provide more insights on this.
>>
>> A while ago, I faced the similar issues trying to run Beam SQL against
>> TPC-DS benchmark.
>> We had a discussion around that [1], please, take a look since it can be
>> helpful.
>>
>> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>>
>> —
>> Alexey
>>
>> On 18 May 2023, at 11:36, Wiśniowski Piotr
>> <co...@gmail.com> <co...@gmail.com>
>> wrote:
>>
>> HI,
>>
>> After experimenting with Beam SQL I did find some limitations. Testing on
>> near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`)
>> with Calcite, direct runner and openjdk version "11.0.19". Please let me
>> know if some of them are known/ worked on/ have tickets or have estimated
>> fix time. I believe most of them are low hanging fruits or just my thinking
>> is not right for the problem. If this is the case please guide me to some
>> working solution.
>>
>>  From my perspective it is ok to have a fix just on master - no need to
>> wait for release. Priority order:
>> - 7. Windowing function on a stream - in detail - How to get previous
>> message for a key? setting expiration arbitrary big is ok, but access to
>> the previous record must happen fairly quickly not wait for the big window
>> to finish and emit the expired keys. Ideally would like to do it in pure
>> beam pipeline as saving to some external key/value store and then reading
>> this here could potentially result in some race conditions which in I would
>> like to avoid, but if its the only option - let it be.
>> - 5. single UNION ALL possible
>> - 4. UNNEST ARRAY with nested ROW
>> - 3. Using * when there is Row type present in the schema
>> - 1. `CROSS JOIN` between two unrelated tables is not supported - even if
>> one is a static number table
>> - 2. ROW construction not supported. It is not possible to nest data
>>
>> Below queries tat I use to testing this scenarios.
>>
>> Thank You for looking at this topics!
>>
>> Best
>>
>> Wiśniowski Piotr
>> -----------------------
>> -- 1. `CROSS JOIN` between two unrelated tables is not supported.
>> -----------------------
>> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same
>> table.
>> -- It is not possible to number rows
>> WITH data_table AS (
>> SELECT 1 AS a
>> ),
>> number_table AS (
>> SELECT
>> numbers_exploded AS number_item
>> FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS
>> numbers_exploded
>> )
>> SELECT
>> data_table.a,
>> number_table.number_item
>> FROM data_table
>> CROSS JOIN number_table
>> ;
>> -- CROSS JOIN, JOIN ON FALSE is not supported!
>> -----------------------
>> -- 2. ROW construction not supported. It is not possible to nest data
>> -----------------------
>> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
>> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
>> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same
>> type
>> SELECT MAP['field1','b','field2','a']; -- null
>> -- WORKAROUND - manually compose json string,
>> -- drawback - decomposing might be not supported or would need to be also
>> based on string operations
>> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS
>> `json_object`;
>> -----------------------
>> -- 3. Using * when there is Row type present in the schema
>> -----------------------
>> CREATE EXTERNAL TABLE test_tmp_1(
>> `ref` VARCHAR,
>> `author` ROW<
>> `name` VARCHAR,
>> `email` VARCHAR
>> >
>> )
>> TYPE text
>> LOCATION 'python/dbt/tests/using_star_limitation.jsonl'
>> TBLPROPERTIES '{"format":"json",
>> "deadLetterFile":"top/python/dbt/tests/dead"}';
>> SELECT * FROM test_tmp_1;
>> -- java.lang.NoSuchFieldException: name
>> -- WORKAROUND - refer to columns explicitly with alias
>> SELECT
>> `ref` AS ref_value,
>> test_tmp_1.`author`.`name` AS author_name, -- table name must be
>> referenced explicitly - this could be fixed too
>> test_tmp_1.`author`.`email` AS author_name
>> FROM test_tmp_1;
>> -----------------------
>> -- 4. UNNEST ARRAY with nested ROW
>> -----------------------
>> CREATE EXTERNAL TABLE test_tmp(
>> `ref` VARCHAR,
>> `commits` ARRAY<ROW<
>> `id` VARCHAR,
>> `author` ROW<
>> `name` VARCHAR,
>> `email` VARCHAR
>> >
>> >>
>> )
>> TYPE text
>> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
>> TBLPROPERTIES '{"format":"json",
>> "deadLetterFile":"python/dbt/tests/dead"}';
>> SELECT
>> test_tmp.`ref` AS branch_name,
>> commit_item.`id` AS commit_hash,
>> commit_item.`author`.`name` AS author_name
>> FROM test_tmp
>> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item;
>> -- Row expected 4 fields (Field{name=ref, description=, type=STRING,
>> options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING,
>> author ROW<name STRING, email STRING>> NOT NULL>, options={{}}},
>> Field{name=id, description=, type=STRING, options={{}}}, Field{name=author,
>> description=, type=ROW<name STRING, email STRING>, options={{}}}).
>> initialized with 5 fields.
>> -- limited WORKAROUND - refer to array elements by index and UNION ALL
>> the items into rows
>> -- note workaround that uses number table will not work as CROSS JOIN is
>> not supported
>> WITH data_parsed AS (
>> SELECT
>> test_tmp.`ref` AS branch_id,
>> test_tmp.commits[1].`id` AS commit_hash,
>> test_tmp.commits[1].`author`.`name` AS author_name
>> FROM test_tmp
>> UNION ALL -- this unfortunately works only for two indexes
>> SELECT
>> test_tmp.`ref` AS branch_id,
>> test_tmp.commits[2].`id` AS commit_hash,
>> test_tmp.commits[2].`author`.`name` AS author_name
>> FROM test_tmp
>> )
>> SELECT *
>> FROM data_parsed
>> WHERE author_name IS NOT NULL
>> ;
>> -- better WORKAROUND - but tricky to get right (fragile)
>> WITH data_with_number_array AS (
>> SELECT
>> test_tmp.`ref` AS branch_name, -- there must be some primary key in the
>> data to join on later due to CROSS JOIN support limitation
>> ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array,
>> CARDINALITY(test_tmp.commits) AS commits_size
>> FROM test_tmp
>> ),
>> data_with_numbers AS (
>> SELECT
>> branch_name,
>> `EXPR$0` AS number_item
>> FROM data_with_number_array
>> CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded
>> WHERE `EXPR$0` <= commits_size
>> ),
>> data_exploded AS (
>> SELECT
>> test_tmp.`ref` AS branch_name,
>> test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash,
>> test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS
>> author_name
>> FROM test_tmp
>> INNER JOIN data_with_numbers
>> ON data_with_numbers.branch_name = test_tmp.`ref`
>> )
>> SELECT
>> branch_name,
>> commit_hash,
>> author_name
>> FROM data_exploded
>> -- WHERE author_name IS NOT NULL - not possible here due to `Non
>> equi-join is not supported`
>> -- as it pushes this condition as predicate pushdown to join.
>> -- Is there any way to force checking this condition on here and not to
>> project it upstream?
>> ;
>> -----------------------
>> -- 5. single UNION ALL possible
>> -----------------------
>> SELECT 1 AS a
>> UNION ALL
>> SELECT 2 AS a
>> UNION ALL
>> SELECT 3 AS a;
>> -- Wrong number of arguments to BeamUnionRel:
>> org.apache.beam.sdk.values.PCollectionList@70f145ac
>> -----------------------
>> -- 6. Reserved names
>> -----------------------
>> -- json_object
>> SELECT '{}' AS json_object;
>> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1, column
>> 13.
>> -- WORKAROUND SELECT '{}' AS `json_object`
>> -----------------------
>> -- 7. Windowing function on stream
>> -----------------------
>> -- in detail - How to get previous message for a key?
>> -- setting expiration arbitrary big is ok, but access to the previous
>> record must happen fairly quickly
>> -- not wait for the big window to finish and emit the expired keys.
>> -- Ideally would like to do it in pure beam pipeline as saving to some
>> external key/value store
>> -- and then reading this here could potentially result in some race
>> conditions which would be hard to debug.
>> DROP TABLE IF EXISTS unbounded_stream;
>> CREATE EXTERNAL TABLE unbounded_stream(
>> sequence BIGINT,
>> event_time TIMESTAMP
>> )
>> TYPE 'sequence'
>> TBLPROPERTIES '{"elementsPerSecond":1}'
>> ;
>> CREATE EXTERNAL TABLE data_1_bounded(
>> `sequence_nb` BIGINT,
>> `sender_login` VARCHAR,
>> `user_id` VARCHAR
>> )
>> TYPE text
>> LOCATION
>> 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
>> TBLPROPERTIES '{"format":"json",
>> "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
>> ;
>> WITH
>> test_data_1_unbounded AS (
>> SELECT
>> sender_login,
>> user_id,
>> event_time
>> FROM unbounded_stream
>> INNER JOIN data_1_bounded
>> ON unbounded_stream.sequence = data_1_bounded.sequence_nb
>> ),
>> test_data_1_lookbehind AS (
>> SELECT
>> sender_login,
>> LAST_VALUE(user_id) OVER previous_win AS user_id
>> FROM test_data_1_unbounded
>> WINDOW previous_win AS (
>> PARTITION BY sender_login
>> ORDER BY event_time ASC
>> ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
>> )
>> )
>> SELECT *
>> FROM test_data_1_lookbehind
>> LIMIT 8
>> ;
>> -- There are not enough rules to produce a node with desired properties:
>> convention=ENUMERABLE. All the inputs have relevant nodes, however the cost
>> is still infinite.
>> -- Root: rel#29:RelSubset#4.ENUMERABLE
>> -- Original rel:
>> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost =
>> {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
>> -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER
>> (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2,
>> cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
>> -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2,
>> cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
>> -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0,
>> cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
>> -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0,
>> cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
>> --
>> -- Sets:
>> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
>> -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
>> -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]),
>> rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
>> -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
>> -- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
>> rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login,
>> VARCHAR user_id)
>> -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
>> -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]),
>> rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
>> -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
>> -- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
>> rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time,
>> BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
>> -- rel#21:RelSubset#2.NONE, best=null
>> --
>> rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
>> $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
>> -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1,
>> $2]), rowcount=1.2, cumulative cost={inf}
>> --
>> rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
>> rowcount=1.2, cumulative cost={inf}
>>
>>
>>
>>

Re: Beam SQL found limitations

Posted by Wiśniowski Piotr <co...@gmail.com>.
Hi Kenn,

Thanks for clarification.

1. Just to put an example in front - for every event that comes in I 
need to find corresponding previous event of same user_id and pass 
previous_event_timestamp in the current event payload down (and also 
current event becomes previous event for future events that come in for 
same user). Question is how to do it with BeamSQL. I am aware that 
analytic windowing (like last_value over etc.) might not be a way for 
streaming and I am ok with this - it make sense under the hood just as 
You mention.

The task is to be able to keep a simple state in streaming SQL. What I 
did come up with is using sliding window to have this state available 
for each new event that comes in.

```

WITH
unbounded_stream_initialized AS (
     SELECT
         user_id,
         event_time
     FROM unbounded_stream
     GROUP BY
         user_id,
         event_time,
         TUMBLE(event_time,INTERVAL '1' SECONDS)
     UNION ALL
     -- this is needed as first session window by default starts at 
first element, while here we need to start it in the past
     -- so that there is a window that ends just after first real element
     SELECT
         CAST(0 AS BIGINT) AS user_id,
         CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
     FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not 
allow to have GROUP BY just after SELECT
     GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL 
'1' SECONDS)
),
test_data_1 AS (
     SELECT
         user_id,
         MAX(event_time) AS prev_event_time,
         HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS 
window_end_at
     FROM unbounded_stream_initialized
     GROUP BY
         user_id,
         HOP(
             -- first create a sliding window to aggregate state
             event_time,
             INTERVAL '1' SECONDS,
             INTERVAL '7' DAYS -- The idea is to have this quite long 
compared to interval
         )
),
test_data_1_lookup AS (
     SELECT
         user_id,
         prev_event_time
     FROM test_data_1
     GROUP BY
         user_id,
         -- then re-window into windows suitable for joining main stream
         TUMBLE(window_end_at, INTERVAL '1' SECONDS)
),
enriched_info AS (
     SELECT
         unbounded_stream_initialized.event_timestamp AS event_timestamp,
         unbounded_stream_initialized.user_id AS user_id,
         test_data_1_lookup.prev_event_time AS prev_event_time
     FROM unbounded_stream_initialized
     LEFT JOIN test_data_1_lookup
         ON unbounded_stream_initialized.user_id = 
test_data_1_lookup.user_id
)
SELECT
     *
FROM enriched_info

```

The doubt that I have is whether above will not store too much redundant 
data as `test_data_1` suggests it could duplicate and store each 
incoming msg into all windows there are in the sliding window definition 
(might be a lot in this case). Or actually resolving if a message 
belongs to a window is done later when evaluating `LEFT JOIN`? Target 
DataFlow. I am still learning Beam so there might be some core thing 
that I miss to understand how it is processed.

2. Any hints on implementing FirestoreIOTableProvider? just more or less 
how to do it where to look for important parts etc. It seems we would 
need this functionality.

3. I will try to report some more interesting findings. If possible 
please prioritize fixing this ROW error.

Best

Piotr

On 26.05.2023 21:36, Kenneth Knowles wrote:
> Just want to clarify that Beam's concept of windowing is really an 
> event-time based key, and they are all processed logically 
> simultaneously. SQL's concept of windowing function is to sort rows 
> and process them linearly. They are actually totally different. From 
> your queries it seems you are interested in SQL's windowing functions 
> (aka analytic functions).
>
> I am surprised by the problems with rows, since we have used them 
> extensively. Hopefully it is not too hard to fix. Same with the UNION 
> ALL problem.
>
> And for the CROSS JOIN it would be a nice feature to allow in some 
> cases it seems. Should not be hard.
>
> Thank you for reporting this! If you have time it would be really 
> great to get each of these reproducible problems into GitHub issues, each.
>
> Kenn
>
> On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr 
> <co...@gmail.com> wrote:
>
>     Hi Alexey,
>
>     Thank You for reference to that discussion I do actually have
>     pretty similar thoughts on what Beam SQL needs.
>
>     Update from my side:
>
>     Actually did find a workaround for issue with windowing function
>     on stream. It basically boils down to using sliding window to
>     collect and aggregate the state. But would need an advice if this
>     is actually a cost efficient method (targeting DataFlow runner).
>     The doubt that I have is that this sliding window would need to
>     have sliding interval less than 1s and size more than a week and
>     be feed with quire frequent data. If I do understand this
>     correctly - it would mean each input row would need to be
>     duplicated for each window and stored which could be quite
>     significant storage cost?
>
>     Or actually Beam does not physically duplicate the record but just
>     tracks to which windows the record currently belongs?
>
>
>     And the real issue that BeamSQL needs at the moment in my opinion
>     is fixing bugs.
>
>     Some bugs that I found that prevent one from using it and would
>     really appreciate fast fix:
>
>     - UNNEST ARRAY with a nested ROW (described below, created ticket
>     - https://github.com/apache/beam/issues/26911)
>
>     - PubSub table provider actually requires all table properties to
>     be there (with null in `timestampAttributeKey` it fails) - which
>     essentially does not allow one to use pubsub publish timestamp as
>     `timestampAttributeKey`.
>
>     - its not possible to cast VARCHAR to BYTES. And BYTES is needed
>     for DataStoreV1TableProvider to provide a key for storage. Also
>     consider updating
>     `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
>     requires VARCHAR instead of BYTES - its even easier in implementation.
>
>     - Any hints on how to implement `FireStoreIOTableProvider`? I am
>     considering implementing it and contributing depending on my team
>     decision - but would like to get like idea how hard this task is.
>
>     Will create tickets for the rest of issues when I will have some
>     spare time.
>
>     Best regards
>
>     Wiśniowski Piotr
>
>
>     On 22.05.2023 18:28, Alexey Romanenko wrote:
>>     Hi Piotr,
>>
>>     Thanks for details! I cross-post this to dev@ as well since, I
>>     guess, people there can provide more insights on this.
>>
>>     A while ago, I faced the similar issues trying to run Beam SQL
>>     against TPC-DS benchmark.
>>     We had a discussion around that [1], please, take a look since it
>>     can be helpful.
>>
>>     [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>>
>>     —
>>     Alexey
>>
>>>     On 18 May 2023, at 11:36, Wiśniowski Piotr
>>>     <co...@gmail.com>
>>>     <ma...@gmail.com> wrote:
>>>
>>>     HI,
>>>
>>>     After experimenting with Beam SQL I did find some limitations.
>>>     Testing on near latest main (precisely
>>>     `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct
>>>     runner and openjdk version "11.0.19". Please let me know if some
>>>     of them are known/ worked on/ have tickets or have estimated fix
>>>     time. I believe most of them are low hanging fruits or just my
>>>     thinking is not right for the problem. If this is the case
>>>     please guide me to some working solution.
>>>
>>>      From my perspective it is ok to have a fix just on master - no
>>>     need to wait for release. Priority order:
>>>     - 7. Windowing function on a stream - in detail - How to get
>>>     previous message for a key? setting expiration arbitrary big is
>>>     ok, but access to the previous record must happen fairly quickly
>>>     not wait for the big window to finish and emit the expired keys.
>>>     Ideally would like to do it in pure beam pipeline as saving to
>>>     some external key/value store and then reading this here could
>>>     potentially result in some race conditions which in I would like
>>>     to avoid, but if its the only option - let it be.
>>>     - 5. single UNION ALL possible
>>>     - 4. UNNEST ARRAY with nested ROW
>>>     - 3. Using * when there is Row type present in the schema
>>>     - 1. `CROSS JOIN` between two unrelated tables is not supported
>>>     - even if one is a static number table
>>>     - 2. ROW construction not supported. It is not possible to nest data
>>>
>>>     Below queries tat I use to testing this scenarios.
>>>
>>>     Thank You for looking at this topics!
>>>
>>>     Best
>>>
>>>     Wiśniowski Piotr
>>>
>>>     -----------------------
>>>     -- 1. `CROSS JOIN` between two unrelated tables is not supported.
>>>     -----------------------
>>>     -- Only supported is `CROSS JOIN UNNEST` when exploding array
>>>     from same table.
>>>     -- It is not possible to number rows
>>>     WITHdata_table AS(
>>>     SELECT1ASa
>>>     ),
>>>     number_table AS(
>>>     SELECT
>>>     numbers_exploded ASnumber_item
>>>     FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16])
>>>     ASnumbers_exploded
>>>     )
>>>     SELECT
>>>     data_table.a,
>>>     number_table.number_item
>>>     FROMdata_table
>>>     CROSS JOINnumber_table
>>>     ;
>>>     -- CROSS JOIN, JOIN ON FALSE is not supported!
>>>     -----------------------
>>>     -- 2. ROW construction not supported. It is not possible to nest
>>>     data
>>>     -----------------------
>>>     SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
>>>     SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
>>>     SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the
>>>     same type
>>>     SELECTMAP['field1','b','field2','a']; -- null
>>>     -- WORKAROUND - manually compose json string,
>>>     -- drawback - decomposing might be not supported or would need
>>>     to be also based on string operations
>>>     SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}') AS`json_object`;
>>>     -----------------------
>>>     -- 3. Using * when there is Row type present in the schema
>>>     -----------------------
>>>     CREATEEXTERNALTABLEtest_tmp_1(
>>>     `ref`VARCHAR,
>>>     `author`ROW<
>>>     `name`VARCHAR,
>>>     `email`VARCHAR
>>>     >
>>>     )
>>>     TYPEtext
>>>     LOCATION'python/dbt/tests/using_star_limitation.jsonl'
>>>     TBLPROPERTIES '{"format":"json",
>>>     "deadLetterFile":"top/python/dbt/tests/dead"}';
>>>     SELECT*FROMtest_tmp_1;
>>>     -- java.lang.NoSuchFieldException: name
>>>     -- WORKAROUND - refer to columns explicitly with alias
>>>     SELECT
>>>     `ref`ASref_value,
>>>     test_tmp_1.`author`.`name`ASauthor_name, -- table name must be
>>>     referenced explicitly - this could be fixed too
>>>     test_tmp_1.`author`.`email`ASauthor_name
>>>     FROMtest_tmp_1;
>>>     -----------------------
>>>     -- 4. UNNEST ARRAY with nested ROW
>>>     -----------------------
>>>     CREATEEXTERNALTABLEtest_tmp(
>>>     `ref`VARCHAR,
>>>     `commits`ARRAY<ROW<
>>>     `id`VARCHAR,
>>>     `author`ROW<
>>>     `name`VARCHAR,
>>>     `email`VARCHAR
>>>     >
>>>     >>
>>>     )
>>>     TYPEtext
>>>     LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
>>>     TBLPROPERTIES '{"format":"json",
>>>     "deadLetterFile":"python/dbt/tests/dead"}';
>>>     SELECT
>>>     test_tmp.`ref`ASbranch_name,
>>>     commit_item.`id`AScommit_hash,
>>>     commit_item.`author`.`name`ASauthor_name
>>>     FROMtest_tmp
>>>     CROSS JOINUNNEST(test_tmp.commits) AScommit_item;
>>>     -- Row expected 4 fields (Field{name=ref, description=,
>>>     type=STRING, options={{}}}, Field{name=commits, description=,
>>>     type=ARRAY<ROW<id STRING, author ROW<name STRING, email STRING>>
>>>     NOT NULL>, options={{}}}, Field{name=id, description=,
>>>     type=STRING, options={{}}}, Field{name=author, description=,
>>>     type=ROW<name STRING, email STRING>, options={{}}}). initialized
>>>     with 5 fields.
>>>     -- limited WORKAROUND - refer to array elements by index and
>>>     UNION ALL the items into rows
>>>     -- note workaround that uses number table will not work as CROSS
>>>     JOIN is not supported
>>>     WITHdata_parsed AS(
>>>     SELECT
>>>     test_tmp.`ref`ASbranch_id,
>>>     test_tmp.commits[1].`id`AScommit_hash,
>>>     test_tmp.commits[1].`author`.`name`ASauthor_name
>>>     FROMtest_tmp
>>>     UNION ALL-- this unfortunately works only for two indexes
>>>     SELECT
>>>     test_tmp.`ref`ASbranch_id,
>>>     test_tmp.commits[2].`id`AScommit_hash,
>>>     test_tmp.commits[2].`author`.`name`ASauthor_name
>>>     FROMtest_tmp
>>>     )
>>>     SELECT*
>>>     FROMdata_parsed
>>>     WHEREauthor_name IS NOT NULL
>>>     ;
>>>     -- better WORKAROUND - but tricky to get right (fragile)
>>>     WITHdata_with_number_array AS(
>>>     SELECT
>>>     test_tmp.`ref`ASbranch_name, -- there must be some primary key
>>>     in the data to join on later due to CROSS JOIN support limitation
>>>     ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] ASnumber_array,
>>>     CARDINALITY(test_tmp.commits) AScommits_size
>>>     FROMtest_tmp
>>>     ),
>>>     data_with_numbers AS(
>>>     SELECT
>>>     branch_name,
>>>     `EXPR$0`ASnumber_item
>>>     FROMdata_with_number_array
>>>     CROSS JOINUNNEST(data_with_number_array.number_array) ASexploded
>>>     WHERE`EXPR$0`<=commits_size
>>>     ),
>>>     data_exploded AS(
>>>     SELECT
>>>     test_tmp.`ref`ASbranch_name,
>>>     test_tmp.commits[data_with_numbers.number_item].`id`AScommit_hash,
>>>     test_tmp.commits[data_with_numbers.number_item].`author`.`name`ASauthor_name
>>>     FROMtest_tmp
>>>     INNER JOINdata_with_numbers
>>>     ONdata_with_numbers.branch_name =test_tmp.`ref`
>>>     )
>>>     SELECT
>>>     branch_name,
>>>     commit_hash,
>>>     author_name
>>>     FROMdata_exploded
>>>     -- WHERE author_name IS NOT NULL - not possible here due to `Non
>>>     equi-join is not supported`
>>>     -- as it pushes this condition as predicate pushdown to join.
>>>     -- Is there any way to force checking this condition on here and
>>>     not to project it upstream?
>>>     ;
>>>     -----------------------
>>>     -- 5. single UNION ALL possible
>>>     -----------------------
>>>     SELECT1ASa
>>>     UNION ALL
>>>     SELECT2ASa
>>>     UNION ALL
>>>     SELECT3ASa;
>>>     -- Wrong number of arguments to BeamUnionRel:
>>>     org.apache.beam.sdk.values.PCollectionList@70f145ac
>>>     -----------------------
>>>     -- 6. Reserved names
>>>     -----------------------
>>>     -- json_object
>>>     SELECT'{}'ASjson_object;
>>>     -- parse failed: Incorrect syntax near the keyword 'AS' at line
>>>     1, column 13.
>>>     -- WORKAROUND SELECT '{}' AS `json_object`
>>>     -----------------------
>>>     -- 7. Windowing function on stream
>>>     -----------------------
>>>     -- in detail - How to get previous message for a key?
>>>     -- setting expiration arbitrary big is ok, but access to the
>>>     previous record must happen fairly quickly
>>>     -- not wait for the big window to finish and emit the expired keys.
>>>     -- Ideally would like to do it in pure beam pipeline as saving
>>>     to some external key/value store
>>>     -- and then reading this here could potentially result in some
>>>     race conditions which would be hard to debug.
>>>     DROPTABLEIFEXISTSunbounded_stream;
>>>     CREATEEXTERNALTABLEunbounded_stream(
>>>     sequenceBIGINT,
>>>     event_time TIMESTAMP
>>>     )
>>>     TYPE'sequence'
>>>     TBLPROPERTIES '{"elementsPerSecond":1}'
>>>     ;
>>>     CREATEEXTERNALTABLEdata_1_bounded(
>>>     `sequence_nb`BIGINT,
>>>     `sender_login`VARCHAR,
>>>     `user_id`VARCHAR
>>>     )
>>>     TYPEtext
>>>     LOCATION'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
>>>     TBLPROPERTIES '{"format":"json",
>>>     "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
>>>     ;
>>>     WITH
>>>     test_data_1_unbounded AS(
>>>     SELECT
>>>     sender_login,
>>>     user_id,
>>>     event_time
>>>     FROMunbounded_stream
>>>     INNER JOINdata_1_bounded
>>>     ONunbounded_stream.sequence =data_1_bounded.sequence_nb
>>>     ),
>>>     test_data_1_lookbehind AS(
>>>     SELECT
>>>     sender_login,
>>>     LAST_VALUE(user_id) OVERprevious_win ASuser_id
>>>     FROMtest_data_1_unbounded
>>>     WINDOWprevious_win AS(
>>>     PARTITIONBYsender_login
>>>     ORDER BYevent_time ASC
>>>     ROWSBETWEEN1PRECEDINGAND1PRECEDING
>>>     )
>>>     )
>>>     SELECT*
>>>     FROMtest_data_1_lookbehind
>>>     LIMIT8
>>>     ;
>>>     -- There are not enough rules to produce a node with desired
>>>     properties: convention=ENUMERABLE. All the inputs have relevant
>>>     nodes, however the cost is still infinite.
>>>     -- Root: rel#29:RelSubset#4.ENUMERABLE
>>>     -- Original rel:
>>>     -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost =
>>>     {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
>>>     -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4)
>>>     OVER (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount
>>>     = 1.2, cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0
>>>     io}, id = 14
>>>     -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]):
>>>     rowcount = 1.2, cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io},
>>>     id = 12
>>>     -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount =
>>>     1.0, cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
>>>     -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount =
>>>     8.0, cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
>>>     -- 
>>>     -- Sets:
>>>     -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
>>>     -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
>>>     -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam,
>>>     unbounded_stream]), rowcount=1.0, cumulative cost={1.0 rows, 1.0
>>>     cpu, 1.0 io}
>>>     -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
>>>     --
>>>     rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
>>>     rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
>>>     1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>>>     -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR
>>>     sender_login, VARCHAR user_id)
>>>     -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
>>>     -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam,
>>>     data_1_bounded]), rowcount=8.0, cumulative cost={8.0 rows, 8.0
>>>     cpu, 8.0 io}
>>>     -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
>>>     --
>>>     rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
>>>     rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
>>>     1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>>>     -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6)
>>>     event_time, BIGINT sequence_nb, VARCHAR sender_login, VARCHAR
>>>     user_id)
>>>     -- rel#21:RelSubset#2.NONE, best=null
>>>     --
>>>     rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
>>>     $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
>>>     -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4,
>>>     $0, $1, $2]), rowcount=1.2, cumulative cost={inf}
>>>     --
>>>     rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
>>>     rowcount=1.2, cumulative cost={inf}
>>>
>>>
>>

Re: Beam SQL found limitations

Posted by Wiśniowski Piotr <co...@gmail.com>.
Hi Kenn,

Thanks for clarification.

1. Just to put an example in front - for every event that comes in I 
need to find corresponding previous event of same user_id and pass 
previous_event_timestamp in the current event payload down (and also 
current event becomes previous event for future events that come in for 
same user). Question is how to do it with BeamSQL. I am aware that 
analytic windowing (like last_value over etc.) might not be a way for 
streaming and I am ok with this - it make sense under the hood just as 
You mention.

The task is to be able to keep a simple state in streaming SQL. What I 
did come up with is using sliding window to have this state available 
for each new event that comes in.

```

WITH
unbounded_stream_initialized AS (
     SELECT
         user_id,
         event_time
     FROM unbounded_stream
     GROUP BY
         user_id,
         event_time,
         TUMBLE(event_time,INTERVAL '1' SECONDS)
     UNION ALL
     -- this is needed as first session window by default starts at 
first element, while here we need to start it in the past
     -- so that there is a window that ends just after first real element
     SELECT
         CAST(0 AS BIGINT) AS user_id,
         CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
     FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not 
allow to have GROUP BY just after SELECT
     GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL 
'1' SECONDS)
),
test_data_1 AS (
     SELECT
         user_id,
         MAX(event_time) AS prev_event_time,
         HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS 
window_end_at
     FROM unbounded_stream_initialized
     GROUP BY
         user_id,
         HOP(
             -- first create a sliding window to aggregate state
             event_time,
             INTERVAL '1' SECONDS,
             INTERVAL '7' DAYS -- The idea is to have this quite long 
compared to interval
         )
),
test_data_1_lookup AS (
     SELECT
         user_id,
         prev_event_time
     FROM test_data_1
     GROUP BY
         user_id,
         -- then re-window into windows suitable for joining main stream
         TUMBLE(window_end_at, INTERVAL '1' SECONDS)
),
enriched_info AS (
     SELECT
         unbounded_stream_initialized.event_timestamp AS event_timestamp,
         unbounded_stream_initialized.user_id AS user_id,
         test_data_1_lookup.prev_event_time AS prev_event_time
     FROM unbounded_stream_initialized
     LEFT JOIN test_data_1_lookup
         ON unbounded_stream_initialized.user_id = 
test_data_1_lookup.user_id
)
SELECT
     *
FROM enriched_info

```

The doubt that I have is whether above will not store too much redundant 
data as `test_data_1` suggests it could duplicate and store each 
incoming msg into all windows there are in the sliding window definition 
(might be a lot in this case). Or actually resolving if a message 
belongs to a window is done later when evaluating `LEFT JOIN`? Target 
DataFlow. I am still learning Beam so there might be some core thing 
that I miss to understand how it is processed.

2. Any hints on implementing FirestoreIOTableProvider? just more or less 
how to do it where to look for important parts etc. It seems we would 
need this functionality.

3. I will try to report some more interesting findings. If possible 
please prioritize fixing this ROW error.

Best

Piotr

On 26.05.2023 21:36, Kenneth Knowles wrote:
> Just want to clarify that Beam's concept of windowing is really an 
> event-time based key, and they are all processed logically 
> simultaneously. SQL's concept of windowing function is to sort rows 
> and process them linearly. They are actually totally different. From 
> your queries it seems you are interested in SQL's windowing functions 
> (aka analytic functions).
>
> I am surprised by the problems with rows, since we have used them 
> extensively. Hopefully it is not too hard to fix. Same with the UNION 
> ALL problem.
>
> And for the CROSS JOIN it would be a nice feature to allow in some 
> cases it seems. Should not be hard.
>
> Thank you for reporting this! If you have time it would be really 
> great to get each of these reproducible problems into GitHub issues, each.
>
> Kenn
>
> On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr 
> <co...@gmail.com> wrote:
>
>     Hi Alexey,
>
>     Thank You for reference to that discussion I do actually have
>     pretty similar thoughts on what Beam SQL needs.
>
>     Update from my side:
>
>     Actually did find a workaround for issue with windowing function
>     on stream. It basically boils down to using sliding window to
>     collect and aggregate the state. But would need an advice if this
>     is actually a cost efficient method (targeting DataFlow runner).
>     The doubt that I have is that this sliding window would need to
>     have sliding interval less than 1s and size more than a week and
>     be feed with quire frequent data. If I do understand this
>     correctly - it would mean each input row would need to be
>     duplicated for each window and stored which could be quite
>     significant storage cost?
>
>     Or actually Beam does not physically duplicate the record but just
>     tracks to which windows the record currently belongs?
>
>
>     And the real issue that BeamSQL needs at the moment in my opinion
>     is fixing bugs.
>
>     Some bugs that I found that prevent one from using it and would
>     really appreciate fast fix:
>
>     - UNNEST ARRAY with a nested ROW (described below, created ticket
>     - https://github.com/apache/beam/issues/26911)
>
>     - PubSub table provider actually requires all table properties to
>     be there (with null in `timestampAttributeKey` it fails) - which
>     essentially does not allow one to use pubsub publish timestamp as
>     `timestampAttributeKey`.
>
>     - its not possible to cast VARCHAR to BYTES. And BYTES is needed
>     for DataStoreV1TableProvider to provide a key for storage. Also
>     consider updating
>     `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
>     requires VARCHAR instead of BYTES - its even easier in implementation.
>
>     - Any hints on how to implement `FireStoreIOTableProvider`? I am
>     considering implementing it and contributing depending on my team
>     decision - but would like to get like idea how hard this task is.
>
>     Will create tickets for the rest of issues when I will have some
>     spare time.
>
>     Best regards
>
>     Wiśniowski Piotr
>
>
>     On 22.05.2023 18:28, Alexey Romanenko wrote:
>>     Hi Piotr,
>>
>>     Thanks for details! I cross-post this to dev@ as well since, I
>>     guess, people there can provide more insights on this.
>>
>>     A while ago, I faced the similar issues trying to run Beam SQL
>>     against TPC-DS benchmark.
>>     We had a discussion around that [1], please, take a look since it
>>     can be helpful.
>>
>>     [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>>
>>     —
>>     Alexey
>>
>>>     On 18 May 2023, at 11:36, Wiśniowski Piotr
>>>     <co...@gmail.com>
>>>     <ma...@gmail.com> wrote:
>>>
>>>     HI,
>>>
>>>     After experimenting with Beam SQL I did find some limitations.
>>>     Testing on near latest main (precisely
>>>     `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct
>>>     runner and openjdk version "11.0.19". Please let me know if some
>>>     of them are known/ worked on/ have tickets or have estimated fix
>>>     time. I believe most of them are low hanging fruits or just my
>>>     thinking is not right for the problem. If this is the case
>>>     please guide me to some working solution.
>>>
>>>      From my perspective it is ok to have a fix just on master - no
>>>     need to wait for release. Priority order:
>>>     - 7. Windowing function on a stream - in detail - How to get
>>>     previous message for a key? setting expiration arbitrary big is
>>>     ok, but access to the previous record must happen fairly quickly
>>>     not wait for the big window to finish and emit the expired keys.
>>>     Ideally would like to do it in pure beam pipeline as saving to
>>>     some external key/value store and then reading this here could
>>>     potentially result in some race conditions which in I would like
>>>     to avoid, but if its the only option - let it be.
>>>     - 5. single UNION ALL possible
>>>     - 4. UNNEST ARRAY with nested ROW
>>>     - 3. Using * when there is Row type present in the schema
>>>     - 1. `CROSS JOIN` between two unrelated tables is not supported
>>>     - even if one is a static number table
>>>     - 2. ROW construction not supported. It is not possible to nest data
>>>
>>>     Below queries tat I use to testing this scenarios.
>>>
>>>     Thank You for looking at this topics!
>>>
>>>     Best
>>>
>>>     Wiśniowski Piotr
>>>
>>>     -----------------------
>>>     -- 1. `CROSS JOIN` between two unrelated tables is not supported.
>>>     -----------------------
>>>     -- Only supported is `CROSS JOIN UNNEST` when exploding array
>>>     from same table.
>>>     -- It is not possible to number rows
>>>     WITHdata_table AS(
>>>     SELECT1ASa
>>>     ),
>>>     number_table AS(
>>>     SELECT
>>>     numbers_exploded ASnumber_item
>>>     FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16])
>>>     ASnumbers_exploded
>>>     )
>>>     SELECT
>>>     data_table.a,
>>>     number_table.number_item
>>>     FROMdata_table
>>>     CROSS JOINnumber_table
>>>     ;
>>>     -- CROSS JOIN, JOIN ON FALSE is not supported!
>>>     -----------------------
>>>     -- 2. ROW construction not supported. It is not possible to nest
>>>     data
>>>     -----------------------
>>>     SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
>>>     SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
>>>     SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the
>>>     same type
>>>     SELECTMAP['field1','b','field2','a']; -- null
>>>     -- WORKAROUND - manually compose json string,
>>>     -- drawback - decomposing might be not supported or would need
>>>     to be also based on string operations
>>>     SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}') AS`json_object`;
>>>     -----------------------
>>>     -- 3. Using * when there is Row type present in the schema
>>>     -----------------------
>>>     CREATEEXTERNALTABLEtest_tmp_1(
>>>     `ref`VARCHAR,
>>>     `author`ROW<
>>>     `name`VARCHAR,
>>>     `email`VARCHAR
>>>     >
>>>     )
>>>     TYPEtext
>>>     LOCATION'python/dbt/tests/using_star_limitation.jsonl'
>>>     TBLPROPERTIES '{"format":"json",
>>>     "deadLetterFile":"top/python/dbt/tests/dead"}';
>>>     SELECT*FROMtest_tmp_1;
>>>     -- java.lang.NoSuchFieldException: name
>>>     -- WORKAROUND - refer to columns explicitly with alias
>>>     SELECT
>>>     `ref`ASref_value,
>>>     test_tmp_1.`author`.`name`ASauthor_name, -- table name must be
>>>     referenced explicitly - this could be fixed too
>>>     test_tmp_1.`author`.`email`ASauthor_name
>>>     FROMtest_tmp_1;
>>>     -----------------------
>>>     -- 4. UNNEST ARRAY with nested ROW
>>>     -----------------------
>>>     CREATEEXTERNALTABLEtest_tmp(
>>>     `ref`VARCHAR,
>>>     `commits`ARRAY<ROW<
>>>     `id`VARCHAR,
>>>     `author`ROW<
>>>     `name`VARCHAR,
>>>     `email`VARCHAR
>>>     >
>>>     >>
>>>     )
>>>     TYPEtext
>>>     LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
>>>     TBLPROPERTIES '{"format":"json",
>>>     "deadLetterFile":"python/dbt/tests/dead"}';
>>>     SELECT
>>>     test_tmp.`ref`ASbranch_name,
>>>     commit_item.`id`AScommit_hash,
>>>     commit_item.`author`.`name`ASauthor_name
>>>     FROMtest_tmp
>>>     CROSS JOINUNNEST(test_tmp.commits) AScommit_item;
>>>     -- Row expected 4 fields (Field{name=ref, description=,
>>>     type=STRING, options={{}}}, Field{name=commits, description=,
>>>     type=ARRAY<ROW<id STRING, author ROW<name STRING, email STRING>>
>>>     NOT NULL>, options={{}}}, Field{name=id, description=,
>>>     type=STRING, options={{}}}, Field{name=author, description=,
>>>     type=ROW<name STRING, email STRING>, options={{}}}). initialized
>>>     with 5 fields.
>>>     -- limited WORKAROUND - refer to array elements by index and
>>>     UNION ALL the items into rows
>>>     -- note workaround that uses number table will not work as CROSS
>>>     JOIN is not supported
>>>     WITHdata_parsed AS(
>>>     SELECT
>>>     test_tmp.`ref`ASbranch_id,
>>>     test_tmp.commits[1].`id`AScommit_hash,
>>>     test_tmp.commits[1].`author`.`name`ASauthor_name
>>>     FROMtest_tmp
>>>     UNION ALL-- this unfortunately works only for two indexes
>>>     SELECT
>>>     test_tmp.`ref`ASbranch_id,
>>>     test_tmp.commits[2].`id`AScommit_hash,
>>>     test_tmp.commits[2].`author`.`name`ASauthor_name
>>>     FROMtest_tmp
>>>     )
>>>     SELECT*
>>>     FROMdata_parsed
>>>     WHEREauthor_name IS NOT NULL
>>>     ;
>>>     -- better WORKAROUND - but tricky to get right (fragile)
>>>     WITHdata_with_number_array AS(
>>>     SELECT
>>>     test_tmp.`ref`ASbranch_name, -- there must be some primary key
>>>     in the data to join on later due to CROSS JOIN support limitation
>>>     ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] ASnumber_array,
>>>     CARDINALITY(test_tmp.commits) AScommits_size
>>>     FROMtest_tmp
>>>     ),
>>>     data_with_numbers AS(
>>>     SELECT
>>>     branch_name,
>>>     `EXPR$0`ASnumber_item
>>>     FROMdata_with_number_array
>>>     CROSS JOINUNNEST(data_with_number_array.number_array) ASexploded
>>>     WHERE`EXPR$0`<=commits_size
>>>     ),
>>>     data_exploded AS(
>>>     SELECT
>>>     test_tmp.`ref`ASbranch_name,
>>>     test_tmp.commits[data_with_numbers.number_item].`id`AScommit_hash,
>>>     test_tmp.commits[data_with_numbers.number_item].`author`.`name`ASauthor_name
>>>     FROMtest_tmp
>>>     INNER JOINdata_with_numbers
>>>     ONdata_with_numbers.branch_name =test_tmp.`ref`
>>>     )
>>>     SELECT
>>>     branch_name,
>>>     commit_hash,
>>>     author_name
>>>     FROMdata_exploded
>>>     -- WHERE author_name IS NOT NULL - not possible here due to `Non
>>>     equi-join is not supported`
>>>     -- as it pushes this condition as predicate pushdown to join.
>>>     -- Is there any way to force checking this condition on here and
>>>     not to project it upstream?
>>>     ;
>>>     -----------------------
>>>     -- 5. single UNION ALL possible
>>>     -----------------------
>>>     SELECT1ASa
>>>     UNION ALL
>>>     SELECT2ASa
>>>     UNION ALL
>>>     SELECT3ASa;
>>>     -- Wrong number of arguments to BeamUnionRel:
>>>     org.apache.beam.sdk.values.PCollectionList@70f145ac
>>>     -----------------------
>>>     -- 6. Reserved names
>>>     -----------------------
>>>     -- json_object
>>>     SELECT'{}'ASjson_object;
>>>     -- parse failed: Incorrect syntax near the keyword 'AS' at line
>>>     1, column 13.
>>>     -- WORKAROUND SELECT '{}' AS `json_object`
>>>     -----------------------
>>>     -- 7. Windowing function on stream
>>>     -----------------------
>>>     -- in detail - How to get previous message for a key?
>>>     -- setting expiration arbitrary big is ok, but access to the
>>>     previous record must happen fairly quickly
>>>     -- not wait for the big window to finish and emit the expired keys.
>>>     -- Ideally would like to do it in pure beam pipeline as saving
>>>     to some external key/value store
>>>     -- and then reading this here could potentially result in some
>>>     race conditions which would be hard to debug.
>>>     DROPTABLEIFEXISTSunbounded_stream;
>>>     CREATEEXTERNALTABLEunbounded_stream(
>>>     sequenceBIGINT,
>>>     event_time TIMESTAMP
>>>     )
>>>     TYPE'sequence'
>>>     TBLPROPERTIES '{"elementsPerSecond":1}'
>>>     ;
>>>     CREATEEXTERNALTABLEdata_1_bounded(
>>>     `sequence_nb`BIGINT,
>>>     `sender_login`VARCHAR,
>>>     `user_id`VARCHAR
>>>     )
>>>     TYPEtext
>>>     LOCATION'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
>>>     TBLPROPERTIES '{"format":"json",
>>>     "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
>>>     ;
>>>     WITH
>>>     test_data_1_unbounded AS(
>>>     SELECT
>>>     sender_login,
>>>     user_id,
>>>     event_time
>>>     FROMunbounded_stream
>>>     INNER JOINdata_1_bounded
>>>     ONunbounded_stream.sequence =data_1_bounded.sequence_nb
>>>     ),
>>>     test_data_1_lookbehind AS(
>>>     SELECT
>>>     sender_login,
>>>     LAST_VALUE(user_id) OVERprevious_win ASuser_id
>>>     FROMtest_data_1_unbounded
>>>     WINDOWprevious_win AS(
>>>     PARTITIONBYsender_login
>>>     ORDER BYevent_time ASC
>>>     ROWSBETWEEN1PRECEDINGAND1PRECEDING
>>>     )
>>>     )
>>>     SELECT*
>>>     FROMtest_data_1_lookbehind
>>>     LIMIT8
>>>     ;
>>>     -- There are not enough rules to produce a node with desired
>>>     properties: convention=ENUMERABLE. All the inputs have relevant
>>>     nodes, however the cost is still infinite.
>>>     -- Root: rel#29:RelSubset#4.ENUMERABLE
>>>     -- Original rel:
>>>     -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost =
>>>     {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
>>>     -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4)
>>>     OVER (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount
>>>     = 1.2, cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0
>>>     io}, id = 14
>>>     -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]):
>>>     rowcount = 1.2, cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io},
>>>     id = 12
>>>     -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount =
>>>     1.0, cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
>>>     -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount =
>>>     8.0, cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
>>>     -- 
>>>     -- Sets:
>>>     -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
>>>     -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
>>>     -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam,
>>>     unbounded_stream]), rowcount=1.0, cumulative cost={1.0 rows, 1.0
>>>     cpu, 1.0 io}
>>>     -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
>>>     --
>>>     rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
>>>     rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
>>>     1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>>>     -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR
>>>     sender_login, VARCHAR user_id)
>>>     -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
>>>     -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam,
>>>     data_1_bounded]), rowcount=8.0, cumulative cost={8.0 rows, 8.0
>>>     cpu, 8.0 io}
>>>     -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
>>>     --
>>>     rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
>>>     rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
>>>     1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>>>     -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6)
>>>     event_time, BIGINT sequence_nb, VARCHAR sender_login, VARCHAR
>>>     user_id)
>>>     -- rel#21:RelSubset#2.NONE, best=null
>>>     --
>>>     rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
>>>     $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
>>>     -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4,
>>>     $0, $1, $2]), rowcount=1.2, cumulative cost={inf}
>>>     --
>>>     rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
>>>     rowcount=1.2, cumulative cost={inf}
>>>
>>>
>>

Re: Beam SQL found limitations

Posted by Kenneth Knowles <ke...@apache.org>.
Just want to clarify that Beam's concept of windowing is really an
event-time based key, and they are all processed logically simultaneously.
SQL's concept of windowing function is to sort rows and process them
linearly. They are actually totally different. From your queries it seems
you are interested in SQL's windowing functions (aka analytic functions).

I am surprised by the problems with rows, since we have used them
extensively. Hopefully it is not too hard to fix. Same with the UNION ALL
problem.

And for the CROSS JOIN it would be a nice feature to allow in some cases it
seems. Should not be hard.

Thank you for reporting this! If you have time it would be really great to
get each of these reproducible problems into GitHub issues, each.

Kenn

On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <
contact.wisniowskipiotr@gmail.com> wrote:

> Hi Alexey,
>
> Thank You for reference to that discussion I do actually have pretty
> similar thoughts on what Beam SQL needs.
>
> Update from my side:
>
> Actually did find a workaround for issue with windowing function on
> stream. It basically boils down to using sliding window to collect and
> aggregate the state. But would need an advice if this is actually a cost
> efficient method (targeting DataFlow runner). The doubt that I have is that
> this sliding window would need to have sliding interval less than 1s and
> size more than a week and be feed with quire frequent data. If I do
> understand this correctly - it would mean each input row would need to be
> duplicated for each window and stored which could be quite significant
> storage cost?
>
> Or actually Beam does not physically duplicate the record but just tracks
> to which windows the record currently belongs?
>
>
> And the real issue that BeamSQL needs at the moment in my opinion is
> fixing bugs.
>
> Some bugs that I found that prevent one from using it and would really
> appreciate fast fix:
>
> - UNNEST ARRAY with a nested ROW (described below, created ticket -
> https://github.com/apache/beam/issues/26911)
>
> - PubSub table provider actually requires all table properties to be there
> (with null in `timestampAttributeKey` it fails) - which essentially does
> not allow one to use pubsub publish timestamp as `timestampAttributeKey`.
>
> - its not possible to cast VARCHAR to BYTES. And BYTES is needed for
> DataStoreV1TableProvider to provide a key for storage. Also consider
> updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
> requires VARCHAR instead of BYTES - its even easier in implementation.
>
> - Any hints on how to implement `FireStoreIOTableProvider`? I am
> considering implementing it and contributing depending on my team decision
> - but would like to get like idea how hard this task is.
>
> Will create tickets for the rest of issues when I will have some spare
> time.
>
> Best regards
>
> Wiśniowski Piotr
>
>
> On 22.05.2023 18:28, Alexey Romanenko wrote:
>
> Hi Piotr,
>
> Thanks for details! I cross-post this to dev@ as well since, I guess,
> people there can provide more insights on this.
>
> A while ago, I faced the similar issues trying to run Beam SQL against
> TPC-DS benchmark.
> We had a discussion around that [1], please, take a look since it can be
> helpful.
>
> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>
> —
> Alexey
>
> On 18 May 2023, at 11:36, Wiśniowski Piotr
> <co...@gmail.com> <co...@gmail.com>
> wrote:
>
> HI,
>
> After experimenting with Beam SQL I did find some limitations. Testing on
> near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`)
> with Calcite, direct runner and openjdk version "11.0.19". Please let me
> know if some of them are known/ worked on/ have tickets or have estimated
> fix time. I believe most of them are low hanging fruits or just my thinking
> is not right for the problem. If this is the case please guide me to some
> working solution.
>
>  From my perspective it is ok to have a fix just on master - no need to
> wait for release. Priority order:
> - 7. Windowing function on a stream - in detail - How to get previous
> message for a key? setting expiration arbitrary big is ok, but access to
> the previous record must happen fairly quickly not wait for the big window
> to finish and emit the expired keys. Ideally would like to do it in pure
> beam pipeline as saving to some external key/value store and then reading
> this here could potentially result in some race conditions which in I would
> like to avoid, but if its the only option - let it be.
> - 5. single UNION ALL possible
> - 4. UNNEST ARRAY with nested ROW
> - 3. Using * when there is Row type present in the schema
> - 1. `CROSS JOIN` between two unrelated tables is not supported - even if
> one is a static number table
> - 2. ROW construction not supported. It is not possible to nest data
>
> Below queries tat I use to testing this scenarios.
>
> Thank You for looking at this topics!
>
> Best
>
> Wiśniowski Piotr
> -----------------------
> -- 1. `CROSS JOIN` between two unrelated tables is not supported.
> -----------------------
> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same
> table.
> -- It is not possible to number rows
> WITH data_table AS (
> SELECT 1 AS a
> ),
> number_table AS (
> SELECT
> numbers_exploded AS number_item
> FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS
> numbers_exploded
> )
> SELECT
> data_table.a,
> number_table.number_item
> FROM data_table
> CROSS JOIN number_table
> ;
> -- CROSS JOIN, JOIN ON FALSE is not supported!
> -----------------------
> -- 2. ROW construction not supported. It is not possible to nest data
> -----------------------
> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same
> type
> SELECT MAP['field1','b','field2','a']; -- null
> -- WORKAROUND - manually compose json string,
> -- drawback - decomposing might be not supported or would need to be also
> based on string operations
> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS
> `json_object`;
> -----------------------
> -- 3. Using * when there is Row type present in the schema
> -----------------------
> CREATE EXTERNAL TABLE test_tmp_1(
> `ref` VARCHAR,
> `author` ROW<
> `name` VARCHAR,
> `email` VARCHAR
> >
> )
> TYPE text
> LOCATION 'python/dbt/tests/using_star_limitation.jsonl'
> TBLPROPERTIES '{"format":"json",
> "deadLetterFile":"top/python/dbt/tests/dead"}';
> SELECT * FROM test_tmp_1;
> -- java.lang.NoSuchFieldException: name
> -- WORKAROUND - refer to columns explicitly with alias
> SELECT
> `ref` AS ref_value,
> test_tmp_1.`author`.`name` AS author_name, -- table name must be
> referenced explicitly - this could be fixed too
> test_tmp_1.`author`.`email` AS author_name
> FROM test_tmp_1;
> -----------------------
> -- 4. UNNEST ARRAY with nested ROW
> -----------------------
> CREATE EXTERNAL TABLE test_tmp(
> `ref` VARCHAR,
> `commits` ARRAY<ROW<
> `id` VARCHAR,
> `author` ROW<
> `name` VARCHAR,
> `email` VARCHAR
> >
> >>
> )
> TYPE text
> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
> TBLPROPERTIES '{"format":"json",
> "deadLetterFile":"python/dbt/tests/dead"}';
> SELECT
> test_tmp.`ref` AS branch_name,
> commit_item.`id` AS commit_hash,
> commit_item.`author`.`name` AS author_name
> FROM test_tmp
> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item;
> -- Row expected 4 fields (Field{name=ref, description=, type=STRING,
> options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING,
> author ROW<name STRING, email STRING>> NOT NULL>, options={{}}},
> Field{name=id, description=, type=STRING, options={{}}}, Field{name=author,
> description=, type=ROW<name STRING, email STRING>, options={{}}}).
> initialized with 5 fields.
> -- limited WORKAROUND - refer to array elements by index and UNION ALL the
> items into rows
> -- note workaround that uses number table will not work as CROSS JOIN is
> not supported
> WITH data_parsed AS (
> SELECT
> test_tmp.`ref` AS branch_id,
> test_tmp.commits[1].`id` AS commit_hash,
> test_tmp.commits[1].`author`.`name` AS author_name
> FROM test_tmp
> UNION ALL -- this unfortunately works only for two indexes
> SELECT
> test_tmp.`ref` AS branch_id,
> test_tmp.commits[2].`id` AS commit_hash,
> test_tmp.commits[2].`author`.`name` AS author_name
> FROM test_tmp
> )
> SELECT *
> FROM data_parsed
> WHERE author_name IS NOT NULL
> ;
> -- better WORKAROUND - but tricky to get right (fragile)
> WITH data_with_number_array AS (
> SELECT
> test_tmp.`ref` AS branch_name, -- there must be some primary key in the
> data to join on later due to CROSS JOIN support limitation
> ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array,
> CARDINALITY(test_tmp.commits) AS commits_size
> FROM test_tmp
> ),
> data_with_numbers AS (
> SELECT
> branch_name,
> `EXPR$0` AS number_item
> FROM data_with_number_array
> CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded
> WHERE `EXPR$0` <= commits_size
> ),
> data_exploded AS (
> SELECT
> test_tmp.`ref` AS branch_name,
> test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash,
> test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS
> author_name
> FROM test_tmp
> INNER JOIN data_with_numbers
> ON data_with_numbers.branch_name = test_tmp.`ref`
> )
> SELECT
> branch_name,
> commit_hash,
> author_name
> FROM data_exploded
> -- WHERE author_name IS NOT NULL - not possible here due to `Non equi-join
> is not supported`
> -- as it pushes this condition as predicate pushdown to join.
> -- Is there any way to force checking this condition on here and not to
> project it upstream?
> ;
> -----------------------
> -- 5. single UNION ALL possible
> -----------------------
> SELECT 1 AS a
> UNION ALL
> SELECT 2 AS a
> UNION ALL
> SELECT 3 AS a;
> -- Wrong number of arguments to BeamUnionRel:
> org.apache.beam.sdk.values.PCollectionList@70f145ac
> -----------------------
> -- 6. Reserved names
> -----------------------
> -- json_object
> SELECT '{}' AS json_object;
> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1, column
> 13.
> -- WORKAROUND SELECT '{}' AS `json_object`
> -----------------------
> -- 7. Windowing function on stream
> -----------------------
> -- in detail - How to get previous message for a key?
> -- setting expiration arbitrary big is ok, but access to the previous
> record must happen fairly quickly
> -- not wait for the big window to finish and emit the expired keys.
> -- Ideally would like to do it in pure beam pipeline as saving to some
> external key/value store
> -- and then reading this here could potentially result in some race
> conditions which would be hard to debug.
> DROP TABLE IF EXISTS unbounded_stream;
> CREATE EXTERNAL TABLE unbounded_stream(
> sequence BIGINT,
> event_time TIMESTAMP
> )
> TYPE 'sequence'
> TBLPROPERTIES '{"elementsPerSecond":1}'
> ;
> CREATE EXTERNAL TABLE data_1_bounded(
> `sequence_nb` BIGINT,
> `sender_login` VARCHAR,
> `user_id` VARCHAR
> )
> TYPE text
> LOCATION
> 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
> TBLPROPERTIES '{"format":"json",
> "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
> ;
> WITH
> test_data_1_unbounded AS (
> SELECT
> sender_login,
> user_id,
> event_time
> FROM unbounded_stream
> INNER JOIN data_1_bounded
> ON unbounded_stream.sequence = data_1_bounded.sequence_nb
> ),
> test_data_1_lookbehind AS (
> SELECT
> sender_login,
> LAST_VALUE(user_id) OVER previous_win AS user_id
> FROM test_data_1_unbounded
> WINDOW previous_win AS (
> PARTITION BY sender_login
> ORDER BY event_time ASC
> ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
> )
> )
> SELECT *
> FROM test_data_1_lookbehind
> LIMIT 8
> ;
> -- There are not enough rules to produce a node with desired properties:
> convention=ENUMERABLE. All the inputs have relevant nodes, however the cost
> is still infinite.
> -- Root: rel#29:RelSubset#4.ENUMERABLE
> -- Original rel:
> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost =
> {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
> -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER
> (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2,
> cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
> -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2,
> cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
> -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0,
> cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
> -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0,
> cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
> --
> -- Sets:
> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
> -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
> -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]),
> rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
> -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
> -- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
> rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login,
> VARCHAR user_id)
> -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
> -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]),
> rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
> -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
> -- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
> rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time,
> BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
> -- rel#21:RelSubset#2.NONE, best=null
> --
> rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
> $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
> -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1,
> $2]), rowcount=1.2, cumulative cost={inf}
> --
> rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
> rowcount=1.2, cumulative cost={inf}
>
>
>
>

Re: Beam SQL found limitations

Posted by Kenneth Knowles <ke...@apache.org>.
Just want to clarify that Beam's concept of windowing is really an
event-time based key, and they are all processed logically simultaneously.
SQL's concept of windowing function is to sort rows and process them
linearly. They are actually totally different. From your queries it seems
you are interested in SQL's windowing functions (aka analytic functions).

I am surprised by the problems with rows, since we have used them
extensively. Hopefully it is not too hard to fix. Same with the UNION ALL
problem.

And for the CROSS JOIN it would be a nice feature to allow in some cases it
seems. Should not be hard.

Thank you for reporting this! If you have time it would be really great to
get each of these reproducible problems into GitHub issues, each.

Kenn

On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <
contact.wisniowskipiotr@gmail.com> wrote:

> Hi Alexey,
>
> Thank You for reference to that discussion I do actually have pretty
> similar thoughts on what Beam SQL needs.
>
> Update from my side:
>
> Actually did find a workaround for issue with windowing function on
> stream. It basically boils down to using sliding window to collect and
> aggregate the state. But would need an advice if this is actually a cost
> efficient method (targeting DataFlow runner). The doubt that I have is that
> this sliding window would need to have sliding interval less than 1s and
> size more than a week and be feed with quire frequent data. If I do
> understand this correctly - it would mean each input row would need to be
> duplicated for each window and stored which could be quite significant
> storage cost?
>
> Or actually Beam does not physically duplicate the record but just tracks
> to which windows the record currently belongs?
>
>
> And the real issue that BeamSQL needs at the moment in my opinion is
> fixing bugs.
>
> Some bugs that I found that prevent one from using it and would really
> appreciate fast fix:
>
> - UNNEST ARRAY with a nested ROW (described below, created ticket -
> https://github.com/apache/beam/issues/26911)
>
> - PubSub table provider actually requires all table properties to be there
> (with null in `timestampAttributeKey` it fails) - which essentially does
> not allow one to use pubsub publish timestamp as `timestampAttributeKey`.
>
> - its not possible to cast VARCHAR to BYTES. And BYTES is needed for
> DataStoreV1TableProvider to provide a key for storage. Also consider
> updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
> requires VARCHAR instead of BYTES - its even easier in implementation.
>
> - Any hints on how to implement `FireStoreIOTableProvider`? I am
> considering implementing it and contributing depending on my team decision
> - but would like to get like idea how hard this task is.
>
> Will create tickets for the rest of issues when I will have some spare
> time.
>
> Best regards
>
> Wiśniowski Piotr
>
>
> On 22.05.2023 18:28, Alexey Romanenko wrote:
>
> Hi Piotr,
>
> Thanks for details! I cross-post this to dev@ as well since, I guess,
> people there can provide more insights on this.
>
> A while ago, I faced the similar issues trying to run Beam SQL against
> TPC-DS benchmark.
> We had a discussion around that [1], please, take a look since it can be
> helpful.
>
> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>
> —
> Alexey
>
> On 18 May 2023, at 11:36, Wiśniowski Piotr
> <co...@gmail.com> <co...@gmail.com>
> wrote:
>
> HI,
>
> After experimenting with Beam SQL I did find some limitations. Testing on
> near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`)
> with Calcite, direct runner and openjdk version "11.0.19". Please let me
> know if some of them are known/ worked on/ have tickets or have estimated
> fix time. I believe most of them are low hanging fruits or just my thinking
> is not right for the problem. If this is the case please guide me to some
> working solution.
>
>  From my perspective it is ok to have a fix just on master - no need to
> wait for release. Priority order:
> - 7. Windowing function on a stream - in detail - How to get previous
> message for a key? setting expiration arbitrary big is ok, but access to
> the previous record must happen fairly quickly not wait for the big window
> to finish and emit the expired keys. Ideally would like to do it in pure
> beam pipeline as saving to some external key/value store and then reading
> this here could potentially result in some race conditions which in I would
> like to avoid, but if its the only option - let it be.
> - 5. single UNION ALL possible
> - 4. UNNEST ARRAY with nested ROW
> - 3. Using * when there is Row type present in the schema
> - 1. `CROSS JOIN` between two unrelated tables is not supported - even if
> one is a static number table
> - 2. ROW construction not supported. It is not possible to nest data
>
> Below queries tat I use to testing this scenarios.
>
> Thank You for looking at this topics!
>
> Best
>
> Wiśniowski Piotr
> -----------------------
> -- 1. `CROSS JOIN` between two unrelated tables is not supported.
> -----------------------
> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same
> table.
> -- It is not possible to number rows
> WITH data_table AS (
> SELECT 1 AS a
> ),
> number_table AS (
> SELECT
> numbers_exploded AS number_item
> FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS
> numbers_exploded
> )
> SELECT
> data_table.a,
> number_table.number_item
> FROM data_table
> CROSS JOIN number_table
> ;
> -- CROSS JOIN, JOIN ON FALSE is not supported!
> -----------------------
> -- 2. ROW construction not supported. It is not possible to nest data
> -----------------------
> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0
> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same
> type
> SELECT MAP['field1','b','field2','a']; -- null
> -- WORKAROUND - manually compose json string,
> -- drawback - decomposing might be not supported or would need to be also
> based on string operations
> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS
> `json_object`;
> -----------------------
> -- 3. Using * when there is Row type present in the schema
> -----------------------
> CREATE EXTERNAL TABLE test_tmp_1(
> `ref` VARCHAR,
> `author` ROW<
> `name` VARCHAR,
> `email` VARCHAR
> >
> )
> TYPE text
> LOCATION 'python/dbt/tests/using_star_limitation.jsonl'
> TBLPROPERTIES '{"format":"json",
> "deadLetterFile":"top/python/dbt/tests/dead"}';
> SELECT * FROM test_tmp_1;
> -- java.lang.NoSuchFieldException: name
> -- WORKAROUND - refer to columns explicitly with alias
> SELECT
> `ref` AS ref_value,
> test_tmp_1.`author`.`name` AS author_name, -- table name must be
> referenced explicitly - this could be fixed too
> test_tmp_1.`author`.`email` AS author_name
> FROM test_tmp_1;
> -----------------------
> -- 4. UNNEST ARRAY with nested ROW
> -----------------------
> CREATE EXTERNAL TABLE test_tmp(
> `ref` VARCHAR,
> `commits` ARRAY<ROW<
> `id` VARCHAR,
> `author` ROW<
> `name` VARCHAR,
> `email` VARCHAR
> >
> >>
> )
> TYPE text
> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
> TBLPROPERTIES '{"format":"json",
> "deadLetterFile":"python/dbt/tests/dead"}';
> SELECT
> test_tmp.`ref` AS branch_name,
> commit_item.`id` AS commit_hash,
> commit_item.`author`.`name` AS author_name
> FROM test_tmp
> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item;
> -- Row expected 4 fields (Field{name=ref, description=, type=STRING,
> options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING,
> author ROW<name STRING, email STRING>> NOT NULL>, options={{}}},
> Field{name=id, description=, type=STRING, options={{}}}, Field{name=author,
> description=, type=ROW<name STRING, email STRING>, options={{}}}).
> initialized with 5 fields.
> -- limited WORKAROUND - refer to array elements by index and UNION ALL the
> items into rows
> -- note workaround that uses number table will not work as CROSS JOIN is
> not supported
> WITH data_parsed AS (
> SELECT
> test_tmp.`ref` AS branch_id,
> test_tmp.commits[1].`id` AS commit_hash,
> test_tmp.commits[1].`author`.`name` AS author_name
> FROM test_tmp
> UNION ALL -- this unfortunately works only for two indexes
> SELECT
> test_tmp.`ref` AS branch_id,
> test_tmp.commits[2].`id` AS commit_hash,
> test_tmp.commits[2].`author`.`name` AS author_name
> FROM test_tmp
> )
> SELECT *
> FROM data_parsed
> WHERE author_name IS NOT NULL
> ;
> -- better WORKAROUND - but tricky to get right (fragile)
> WITH data_with_number_array AS (
> SELECT
> test_tmp.`ref` AS branch_name, -- there must be some primary key in the
> data to join on later due to CROSS JOIN support limitation
> ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array,
> CARDINALITY(test_tmp.commits) AS commits_size
> FROM test_tmp
> ),
> data_with_numbers AS (
> SELECT
> branch_name,
> `EXPR$0` AS number_item
> FROM data_with_number_array
> CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded
> WHERE `EXPR$0` <= commits_size
> ),
> data_exploded AS (
> SELECT
> test_tmp.`ref` AS branch_name,
> test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash,
> test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS
> author_name
> FROM test_tmp
> INNER JOIN data_with_numbers
> ON data_with_numbers.branch_name = test_tmp.`ref`
> )
> SELECT
> branch_name,
> commit_hash,
> author_name
> FROM data_exploded
> -- WHERE author_name IS NOT NULL - not possible here due to `Non equi-join
> is not supported`
> -- as it pushes this condition as predicate pushdown to join.
> -- Is there any way to force checking this condition on here and not to
> project it upstream?
> ;
> -----------------------
> -- 5. single UNION ALL possible
> -----------------------
> SELECT 1 AS a
> UNION ALL
> SELECT 2 AS a
> UNION ALL
> SELECT 3 AS a;
> -- Wrong number of arguments to BeamUnionRel:
> org.apache.beam.sdk.values.PCollectionList@70f145ac
> -----------------------
> -- 6. Reserved names
> -----------------------
> -- json_object
> SELECT '{}' AS json_object;
> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1, column
> 13.
> -- WORKAROUND SELECT '{}' AS `json_object`
> -----------------------
> -- 7. Windowing function on stream
> -----------------------
> -- in detail - How to get previous message for a key?
> -- setting expiration arbitrary big is ok, but access to the previous
> record must happen fairly quickly
> -- not wait for the big window to finish and emit the expired keys.
> -- Ideally would like to do it in pure beam pipeline as saving to some
> external key/value store
> -- and then reading this here could potentially result in some race
> conditions which would be hard to debug.
> DROP TABLE IF EXISTS unbounded_stream;
> CREATE EXTERNAL TABLE unbounded_stream(
> sequence BIGINT,
> event_time TIMESTAMP
> )
> TYPE 'sequence'
> TBLPROPERTIES '{"elementsPerSecond":1}'
> ;
> CREATE EXTERNAL TABLE data_1_bounded(
> `sequence_nb` BIGINT,
> `sender_login` VARCHAR,
> `user_id` VARCHAR
> )
> TYPE text
> LOCATION
> 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
> TBLPROPERTIES '{"format":"json",
> "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
> ;
> WITH
> test_data_1_unbounded AS (
> SELECT
> sender_login,
> user_id,
> event_time
> FROM unbounded_stream
> INNER JOIN data_1_bounded
> ON unbounded_stream.sequence = data_1_bounded.sequence_nb
> ),
> test_data_1_lookbehind AS (
> SELECT
> sender_login,
> LAST_VALUE(user_id) OVER previous_win AS user_id
> FROM test_data_1_unbounded
> WINDOW previous_win AS (
> PARTITION BY sender_login
> ORDER BY event_time ASC
> ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
> )
> )
> SELECT *
> FROM test_data_1_lookbehind
> LIMIT 8
> ;
> -- There are not enough rules to produce a node with desired properties:
> convention=ENUMERABLE. All the inputs have relevant nodes, however the cost
> is still infinite.
> -- Root: rel#29:RelSubset#4.ENUMERABLE
> -- Original rel:
> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost =
> {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
> -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER
> (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2,
> cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
> -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2,
> cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
> -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0,
> cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
> -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0,
> cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
> --
> -- Sets:
> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
> -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
> -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]),
> rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
> -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
> -- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18),
> rowcount=1.0, cumulative cost={1.7976931348623157E308 rows,
> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login,
> VARCHAR user_id)
> -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
> -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]),
> rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
> -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
> -- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19),
> rowcount=8.0, cumulative cost={1.7976931348623157E308 rows,
> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time,
> BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
> -- rel#21:RelSubset#2.NONE, best=null
> --
> rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0,
> $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
> -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1,
> $2]), rowcount=1.2, cumulative cost={inf}
> --
> rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2),
> rowcount=1.2, cumulative cost={inf}
>
>
>
>

Re: Beam SQL found limitations

Posted by Wiśniowski Piotr <co...@gmail.com>.
Hi Alexey,

Thank You for reference to that discussion I do actually have pretty 
similar thoughts on what Beam SQL needs.

Update from my side:

Actually did find a workaround for issue with windowing function on 
stream. It basically boils down to using sliding window to collect and 
aggregate the state. But would need an advice if this is actually a cost 
efficient method (targeting DataFlow runner). The doubt that I have is 
that this sliding window would need to have sliding interval less than 
1s and size more than a week and be feed with quire frequent data. If I 
do understand this correctly - it would mean each input row would need 
to be duplicated for each window and stored which could be quite 
significant storage cost?

Or actually Beam does not physically duplicate the record but just 
tracks to which windows the record currently belongs?


And the real issue that BeamSQL needs at the moment in my opinion is 
fixing bugs.

Some bugs that I found that prevent one from using it and would really 
appreciate fast fix:

- UNNEST ARRAY with a nested ROW (described below, created ticket - 
https://github.com/apache/beam/issues/26911)

- PubSub table provider actually requires all table properties to be 
there (with null in `timestampAttributeKey` it fails) - which 
essentially does not allow one to use pubsub publish timestamp as 
`timestampAttributeKey`.

- its not possible to cast VARCHAR to BYTES. And BYTES is needed for 
DataStoreV1TableProvider to provide a key for storage. Also consider 
updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it 
requires VARCHAR instead of BYTES - its even easier in implementation.

- Any hints on how to implement `FireStoreIOTableProvider`? I am 
considering implementing it and contributing depending on my team 
decision - but would like to get like idea how hard this task is.

Will create tickets for the rest of issues when I will have some spare time.

Best regards

Wiśniowski Piotr


On 22.05.2023 18:28, Alexey Romanenko wrote:
> Hi Piotr,
>
> Thanks for details! I cross-post this to dev@ as well since, I guess, 
> people there can provide more insights on this.
>
> A while ago, I faced the similar issues trying to run Beam SQL against 
> TPC-DS benchmark.
> We had a discussion around that [1], please, take a look since it can 
> be helpful.
>
> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>
> —
> Alexey
>
>> On 18 May 2023, at 11:36, Wiśniowski Piotr 
>> <co...@gmail.com> wrote:
>>
>> HI,
>>
>> After experimenting with Beam SQL I did find some limitations. 
>> Testing on near latest main (precisely 
>> `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct 
>> runner and openjdk version "11.0.19". Please let me know if some of 
>> them are known/ worked on/ have tickets or have estimated fix time. I 
>> believe most of them are low hanging fruits or just my thinking is 
>> not right for the problem. If this is the case please guide me to 
>> some working solution.
>>
>>  From my perspective it is ok to have a fix just on master - no need 
>> to wait for release. Priority order:
>> - 7. Windowing function on a stream - in detail - How to get previous 
>> message for a key? setting expiration arbitrary big is ok, but access 
>> to the previous record must happen fairly quickly not wait for the 
>> big window to finish and emit the expired keys. Ideally would like to 
>> do it in pure beam pipeline as saving to some external key/value 
>> store and then reading this here could potentially result in some 
>> race conditions which in I would like to avoid, but if its the only 
>> option - let it be.
>> - 5. single UNION ALL possible
>> - 4. UNNEST ARRAY with nested ROW
>> - 3. Using * when there is Row type present in the schema
>> - 1. `CROSS JOIN` between two unrelated tables is not supported - 
>> even if one is a static number table
>> - 2. ROW construction not supported. It is not possible to nest data
>>
>> Below queries tat I use to testing this scenarios.
>>
>> Thank You for looking at this topics!
>>
>> Best
>>
>> Wiśniowski Piotr
>>
>> -----------------------
>> -- 1. `CROSS JOIN` between two unrelated tables is not supported.
>> -----------------------
>> -- Only supported is `CROSS JOIN UNNEST` when exploding array from 
>> same table.
>> -- It is not possible to number rows
>> WITHdata_table AS(
>> SELECT1ASa
>> ),
>> number_table AS(
>> SELECT
>> numbers_exploded ASnumber_item
>> FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) 
>> ASnumbers_exploded
>> )
>> SELECT
>> data_table.a,
>> number_table.number_item
>> FROMdata_table
>> CROSS JOINnumber_table
>> ;
>> -- CROSS JOIN, JOIN ON FALSE is not supported!
>> -----------------------
>> -- 2. ROW construction not supported. It is not possible to nest data
>> -----------------------
>> SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
>> SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
>> SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the same 
>> type
>> SELECTMAP['field1','b','field2','a']; -- null
>> -- WORKAROUND - manually compose json string,
>> -- drawback - decomposing might be not supported or would need to be 
>> also based on string operations
>> SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}') AS`json_object`;
>> -----------------------
>> -- 3. Using * when there is Row type present in the schema
>> -----------------------
>> CREATEEXTERNALTABLEtest_tmp_1(
>> `ref`VARCHAR,
>> `author`ROW<
>> `name`VARCHAR,
>> `email`VARCHAR
>> >
>> )
>> TYPEtext
>> LOCATION'python/dbt/tests/using_star_limitation.jsonl'
>> TBLPROPERTIES '{"format":"json", 
>> "deadLetterFile":"top/python/dbt/tests/dead"}';
>> SELECT*FROMtest_tmp_1;
>> -- java.lang.NoSuchFieldException: name
>> -- WORKAROUND - refer to columns explicitly with alias
>> SELECT
>> `ref`ASref_value,
>> test_tmp_1.`author`.`name`ASauthor_name, -- table name must be 
>> referenced explicitly - this could be fixed too
>> test_tmp_1.`author`.`email`ASauthor_name
>> FROMtest_tmp_1;
>> -----------------------
>> -- 4. UNNEST ARRAY with nested ROW
>> -----------------------
>> CREATEEXTERNALTABLEtest_tmp(
>> `ref`VARCHAR,
>> `commits`ARRAY<ROW<
>> `id`VARCHAR,
>> `author`ROW<
>> `name`VARCHAR,
>> `email`VARCHAR
>> >
>> >>
>> )
>> TYPEtext
>> LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
>> TBLPROPERTIES '{"format":"json", 
>> "deadLetterFile":"python/dbt/tests/dead"}';
>> SELECT
>> test_tmp.`ref`ASbranch_name,
>> commit_item.`id`AScommit_hash,
>> commit_item.`author`.`name`ASauthor_name
>> FROMtest_tmp
>> CROSS JOINUNNEST(test_tmp.commits) AScommit_item;
>> -- Row expected 4 fields (Field{name=ref, description=, type=STRING, 
>> options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id 
>> STRING, author ROW<name STRING, email STRING>> NOT NULL>, 
>> options={{}}}, Field{name=id, description=, type=STRING, 
>> options={{}}}, Field{name=author, description=, type=ROW<name STRING, 
>> email STRING>, options={{}}}). initialized with 5 fields.
>> -- limited WORKAROUND - refer to array elements by index and UNION 
>> ALL the items into rows
>> -- note workaround that uses number table will not work as CROSS JOIN 
>> is not supported
>> WITHdata_parsed AS(
>> SELECT
>> test_tmp.`ref`ASbranch_id,
>> test_tmp.commits[1].`id`AScommit_hash,
>> test_tmp.commits[1].`author`.`name`ASauthor_name
>> FROMtest_tmp
>> UNION ALL-- this unfortunately works only for two indexes
>> SELECT
>> test_tmp.`ref`ASbranch_id,
>> test_tmp.commits[2].`id`AScommit_hash,
>> test_tmp.commits[2].`author`.`name`ASauthor_name
>> FROMtest_tmp
>> )
>> SELECT*
>> FROMdata_parsed
>> WHEREauthor_name IS NOT NULL
>> ;
>> -- better WORKAROUND - but tricky to get right (fragile)
>> WITHdata_with_number_array AS(
>> SELECT
>> test_tmp.`ref`ASbranch_name, -- there must be some primary key in the 
>> data to join on later due to CROSS JOIN support limitation
>> ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] ASnumber_array,
>> CARDINALITY(test_tmp.commits) AScommits_size
>> FROMtest_tmp
>> ),
>> data_with_numbers AS(
>> SELECT
>> branch_name,
>> `EXPR$0`ASnumber_item
>> FROMdata_with_number_array
>> CROSS JOINUNNEST(data_with_number_array.number_array) ASexploded
>> WHERE`EXPR$0`<=commits_size
>> ),
>> data_exploded AS(
>> SELECT
>> test_tmp.`ref`ASbranch_name,
>> test_tmp.commits[data_with_numbers.number_item].`id`AScommit_hash,
>> test_tmp.commits[data_with_numbers.number_item].`author`.`name`ASauthor_name
>> FROMtest_tmp
>> INNER JOINdata_with_numbers
>> ONdata_with_numbers.branch_name =test_tmp.`ref`
>> )
>> SELECT
>> branch_name,
>> commit_hash,
>> author_name
>> FROMdata_exploded
>> -- WHERE author_name IS NOT NULL - not possible here due to `Non 
>> equi-join is not supported`
>> -- as it pushes this condition as predicate pushdown to join.
>> -- Is there any way to force checking this condition on here and not 
>> to project it upstream?
>> ;
>> -----------------------
>> -- 5. single UNION ALL possible
>> -----------------------
>> SELECT1ASa
>> UNION ALL
>> SELECT2ASa
>> UNION ALL
>> SELECT3ASa;
>> -- Wrong number of arguments to BeamUnionRel: 
>> org.apache.beam.sdk.values.PCollectionList@70f145ac
>> -----------------------
>> -- 6. Reserved names
>> -----------------------
>> -- json_object
>> SELECT'{}'ASjson_object;
>> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1, 
>> column 13.
>> -- WORKAROUND SELECT '{}' AS `json_object`
>> -----------------------
>> -- 7. Windowing function on stream
>> -----------------------
>> -- in detail - How to get previous message for a key?
>> -- setting expiration arbitrary big is ok, but access to the previous 
>> record must happen fairly quickly
>> -- not wait for the big window to finish and emit the expired keys.
>> -- Ideally would like to do it in pure beam pipeline as saving to 
>> some external key/value store
>> -- and then reading this here could potentially result in some race 
>> conditions which would be hard to debug.
>> DROPTABLEIFEXISTSunbounded_stream;
>> CREATEEXTERNALTABLEunbounded_stream(
>> sequenceBIGINT,
>> event_time TIMESTAMP
>> )
>> TYPE'sequence'
>> TBLPROPERTIES '{"elementsPerSecond":1}'
>> ;
>> CREATEEXTERNALTABLEdata_1_bounded(
>> `sequence_nb`BIGINT,
>> `sender_login`VARCHAR,
>> `user_id`VARCHAR
>> )
>> TYPEtext
>> LOCATION'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
>> TBLPROPERTIES '{"format":"json", 
>> "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
>> ;
>> WITH
>> test_data_1_unbounded AS(
>> SELECT
>> sender_login,
>> user_id,
>> event_time
>> FROMunbounded_stream
>> INNER JOINdata_1_bounded
>> ONunbounded_stream.sequence =data_1_bounded.sequence_nb
>> ),
>> test_data_1_lookbehind AS(
>> SELECT
>> sender_login,
>> LAST_VALUE(user_id) OVERprevious_win ASuser_id
>> FROMtest_data_1_unbounded
>> WINDOWprevious_win AS(
>> PARTITIONBYsender_login
>> ORDER BYevent_time ASC
>> ROWSBETWEEN1PRECEDINGAND1PRECEDING
>> )
>> )
>> SELECT*
>> FROMtest_data_1_lookbehind
>> LIMIT8
>> ;
>> -- There are not enough rules to produce a node with desired 
>> properties: convention=ENUMERABLE. All the inputs have relevant 
>> nodes, however the cost is still infinite.
>> -- Root: rel#29:RelSubset#4.ENUMERABLE
>> -- Original rel:
>> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost = 
>> {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
>> -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER 
>> (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2, 
>> cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
>> -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 
>> 1.2, cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
>> -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0, 
>> cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
>> -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0, 
>> cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
>> -- 
>> -- Sets:
>> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
>> -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
>> -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, 
>> unbounded_stream]), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 
>> 1.0 io}
>> -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
>> -- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18), 
>> rowcount=1.0, cumulative cost={1.7976931348623157E308 rows, 
>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login, 
>> VARCHAR user_id)
>> -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
>> -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]), 
>> rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
>> -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
>> -- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19), 
>> rowcount=8.0, cumulative cost={1.7976931348623157E308 rows, 
>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time, 
>> BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
>> -- rel#21:RelSubset#2.NONE, best=null
>> -- 
>> rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0, 
>> $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
>> -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, 
>> $1, $2]), rowcount=1.2, cumulative cost={inf}
>> -- 
>> rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2), 
>> rowcount=1.2, cumulative cost={inf}
>>
>>
>

Re: Beam SQL found limitations

Posted by Wiśniowski Piotr <co...@gmail.com>.
Hi Alexey,

Thank You for reference to that discussion I do actually have pretty 
similar thoughts on what Beam SQL needs.

Update from my side:

Actually did find a workaround for issue with windowing function on 
stream. It basically boils down to using sliding window to collect and 
aggregate the state. But would need an advice if this is actually a cost 
efficient method (targeting DataFlow runner). The doubt that I have is 
that this sliding window would need to have sliding interval less than 
1s and size more than a week and be feed with quire frequent data. If I 
do understand this correctly - it would mean each input row would need 
to be duplicated for each window and stored which could be quite 
significant storage cost?

Or actually Beam does not physically duplicate the record but just 
tracks to which windows the record currently belongs?


And the real issue that BeamSQL needs at the moment in my opinion is 
fixing bugs.

Some bugs that I found that prevent one from using it and would really 
appreciate fast fix:

- UNNEST ARRAY with a nested ROW (described below, created ticket - 
https://github.com/apache/beam/issues/26911)

- PubSub table provider actually requires all table properties to be 
there (with null in `timestampAttributeKey` it fails) - which 
essentially does not allow one to use pubsub publish timestamp as 
`timestampAttributeKey`.

- its not possible to cast VARCHAR to BYTES. And BYTES is needed for 
DataStoreV1TableProvider to provide a key for storage. Also consider 
updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it 
requires VARCHAR instead of BYTES - its even easier in implementation.

- Any hints on how to implement `FireStoreIOTableProvider`? I am 
considering implementing it and contributing depending on my team 
decision - but would like to get like idea how hard this task is.

Will create tickets for the rest of issues when I will have some spare time.

Best regards

Wiśniowski Piotr


On 22.05.2023 18:28, Alexey Romanenko wrote:
> Hi Piotr,
>
> Thanks for details! I cross-post this to dev@ as well since, I guess, 
> people there can provide more insights on this.
>
> A while ago, I faced the similar issues trying to run Beam SQL against 
> TPC-DS benchmark.
> We had a discussion around that [1], please, take a look since it can 
> be helpful.
>
> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>
> —
> Alexey
>
>> On 18 May 2023, at 11:36, Wiśniowski Piotr 
>> <co...@gmail.com> wrote:
>>
>> HI,
>>
>> After experimenting with Beam SQL I did find some limitations. 
>> Testing on near latest main (precisely 
>> `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct 
>> runner and openjdk version "11.0.19". Please let me know if some of 
>> them are known/ worked on/ have tickets or have estimated fix time. I 
>> believe most of them are low hanging fruits or just my thinking is 
>> not right for the problem. If this is the case please guide me to 
>> some working solution.
>>
>>  From my perspective it is ok to have a fix just on master - no need 
>> to wait for release. Priority order:
>> - 7. Windowing function on a stream - in detail - How to get previous 
>> message for a key? setting expiration arbitrary big is ok, but access 
>> to the previous record must happen fairly quickly not wait for the 
>> big window to finish and emit the expired keys. Ideally would like to 
>> do it in pure beam pipeline as saving to some external key/value 
>> store and then reading this here could potentially result in some 
>> race conditions which in I would like to avoid, but if its the only 
>> option - let it be.
>> - 5. single UNION ALL possible
>> - 4. UNNEST ARRAY with nested ROW
>> - 3. Using * when there is Row type present in the schema
>> - 1. `CROSS JOIN` between two unrelated tables is not supported - 
>> even if one is a static number table
>> - 2. ROW construction not supported. It is not possible to nest data
>>
>> Below queries tat I use to testing this scenarios.
>>
>> Thank You for looking at this topics!
>>
>> Best
>>
>> Wiśniowski Piotr
>>
>> -----------------------
>> -- 1. `CROSS JOIN` between two unrelated tables is not supported.
>> -----------------------
>> -- Only supported is `CROSS JOIN UNNEST` when exploding array from 
>> same table.
>> -- It is not possible to number rows
>> WITHdata_table AS(
>> SELECT1ASa
>> ),
>> number_table AS(
>> SELECT
>> numbers_exploded ASnumber_item
>> FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) 
>> ASnumbers_exploded
>> )
>> SELECT
>> data_table.a,
>> number_table.number_item
>> FROMdata_table
>> CROSS JOINnumber_table
>> ;
>> -- CROSS JOIN, JOIN ON FALSE is not supported!
>> -----------------------
>> -- 2. ROW construction not supported. It is not possible to nest data
>> -----------------------
>> SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
>> SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
>> SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the same 
>> type
>> SELECTMAP['field1','b','field2','a']; -- null
>> -- WORKAROUND - manually compose json string,
>> -- drawback - decomposing might be not supported or would need to be 
>> also based on string operations
>> SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}') AS`json_object`;
>> -----------------------
>> -- 3. Using * when there is Row type present in the schema
>> -----------------------
>> CREATEEXTERNALTABLEtest_tmp_1(
>> `ref`VARCHAR,
>> `author`ROW<
>> `name`VARCHAR,
>> `email`VARCHAR
>> >
>> )
>> TYPEtext
>> LOCATION'python/dbt/tests/using_star_limitation.jsonl'
>> TBLPROPERTIES '{"format":"json", 
>> "deadLetterFile":"top/python/dbt/tests/dead"}';
>> SELECT*FROMtest_tmp_1;
>> -- java.lang.NoSuchFieldException: name
>> -- WORKAROUND - refer to columns explicitly with alias
>> SELECT
>> `ref`ASref_value,
>> test_tmp_1.`author`.`name`ASauthor_name, -- table name must be 
>> referenced explicitly - this could be fixed too
>> test_tmp_1.`author`.`email`ASauthor_name
>> FROMtest_tmp_1;
>> -----------------------
>> -- 4. UNNEST ARRAY with nested ROW
>> -----------------------
>> CREATEEXTERNALTABLEtest_tmp(
>> `ref`VARCHAR,
>> `commits`ARRAY<ROW<
>> `id`VARCHAR,
>> `author`ROW<
>> `name`VARCHAR,
>> `email`VARCHAR
>> >
>> >>
>> )
>> TYPEtext
>> LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
>> TBLPROPERTIES '{"format":"json", 
>> "deadLetterFile":"python/dbt/tests/dead"}';
>> SELECT
>> test_tmp.`ref`ASbranch_name,
>> commit_item.`id`AScommit_hash,
>> commit_item.`author`.`name`ASauthor_name
>> FROMtest_tmp
>> CROSS JOINUNNEST(test_tmp.commits) AScommit_item;
>> -- Row expected 4 fields (Field{name=ref, description=, type=STRING, 
>> options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id 
>> STRING, author ROW<name STRING, email STRING>> NOT NULL>, 
>> options={{}}}, Field{name=id, description=, type=STRING, 
>> options={{}}}, Field{name=author, description=, type=ROW<name STRING, 
>> email STRING>, options={{}}}). initialized with 5 fields.
>> -- limited WORKAROUND - refer to array elements by index and UNION 
>> ALL the items into rows
>> -- note workaround that uses number table will not work as CROSS JOIN 
>> is not supported
>> WITHdata_parsed AS(
>> SELECT
>> test_tmp.`ref`ASbranch_id,
>> test_tmp.commits[1].`id`AScommit_hash,
>> test_tmp.commits[1].`author`.`name`ASauthor_name
>> FROMtest_tmp
>> UNION ALL-- this unfortunately works only for two indexes
>> SELECT
>> test_tmp.`ref`ASbranch_id,
>> test_tmp.commits[2].`id`AScommit_hash,
>> test_tmp.commits[2].`author`.`name`ASauthor_name
>> FROMtest_tmp
>> )
>> SELECT*
>> FROMdata_parsed
>> WHEREauthor_name IS NOT NULL
>> ;
>> -- better WORKAROUND - but tricky to get right (fragile)
>> WITHdata_with_number_array AS(
>> SELECT
>> test_tmp.`ref`ASbranch_name, -- there must be some primary key in the 
>> data to join on later due to CROSS JOIN support limitation
>> ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] ASnumber_array,
>> CARDINALITY(test_tmp.commits) AScommits_size
>> FROMtest_tmp
>> ),
>> data_with_numbers AS(
>> SELECT
>> branch_name,
>> `EXPR$0`ASnumber_item
>> FROMdata_with_number_array
>> CROSS JOINUNNEST(data_with_number_array.number_array) ASexploded
>> WHERE`EXPR$0`<=commits_size
>> ),
>> data_exploded AS(
>> SELECT
>> test_tmp.`ref`ASbranch_name,
>> test_tmp.commits[data_with_numbers.number_item].`id`AScommit_hash,
>> test_tmp.commits[data_with_numbers.number_item].`author`.`name`ASauthor_name
>> FROMtest_tmp
>> INNER JOINdata_with_numbers
>> ONdata_with_numbers.branch_name =test_tmp.`ref`
>> )
>> SELECT
>> branch_name,
>> commit_hash,
>> author_name
>> FROMdata_exploded
>> -- WHERE author_name IS NOT NULL - not possible here due to `Non 
>> equi-join is not supported`
>> -- as it pushes this condition as predicate pushdown to join.
>> -- Is there any way to force checking this condition on here and not 
>> to project it upstream?
>> ;
>> -----------------------
>> -- 5. single UNION ALL possible
>> -----------------------
>> SELECT1ASa
>> UNION ALL
>> SELECT2ASa
>> UNION ALL
>> SELECT3ASa;
>> -- Wrong number of arguments to BeamUnionRel: 
>> org.apache.beam.sdk.values.PCollectionList@70f145ac
>> -----------------------
>> -- 6. Reserved names
>> -----------------------
>> -- json_object
>> SELECT'{}'ASjson_object;
>> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1, 
>> column 13.
>> -- WORKAROUND SELECT '{}' AS `json_object`
>> -----------------------
>> -- 7. Windowing function on stream
>> -----------------------
>> -- in detail - How to get previous message for a key?
>> -- setting expiration arbitrary big is ok, but access to the previous 
>> record must happen fairly quickly
>> -- not wait for the big window to finish and emit the expired keys.
>> -- Ideally would like to do it in pure beam pipeline as saving to 
>> some external key/value store
>> -- and then reading this here could potentially result in some race 
>> conditions which would be hard to debug.
>> DROPTABLEIFEXISTSunbounded_stream;
>> CREATEEXTERNALTABLEunbounded_stream(
>> sequenceBIGINT,
>> event_time TIMESTAMP
>> )
>> TYPE'sequence'
>> TBLPROPERTIES '{"elementsPerSecond":1}'
>> ;
>> CREATEEXTERNALTABLEdata_1_bounded(
>> `sequence_nb`BIGINT,
>> `sender_login`VARCHAR,
>> `user_id`VARCHAR
>> )
>> TYPEtext
>> LOCATION'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
>> TBLPROPERTIES '{"format":"json", 
>> "deadLetterFile":"python/dbt/tests/dead_letters/dead"}'
>> ;
>> WITH
>> test_data_1_unbounded AS(
>> SELECT
>> sender_login,
>> user_id,
>> event_time
>> FROMunbounded_stream
>> INNER JOINdata_1_bounded
>> ONunbounded_stream.sequence =data_1_bounded.sequence_nb
>> ),
>> test_data_1_lookbehind AS(
>> SELECT
>> sender_login,
>> LAST_VALUE(user_id) OVERprevious_win ASuser_id
>> FROMtest_data_1_unbounded
>> WINDOWprevious_win AS(
>> PARTITIONBYsender_login
>> ORDER BYevent_time ASC
>> ROWSBETWEEN1PRECEDINGAND1PRECEDING
>> )
>> )
>> SELECT*
>> FROMtest_data_1_lookbehind
>> LIMIT8
>> ;
>> -- There are not enough rules to produce a node with desired 
>> properties: convention=ENUMERABLE. All the inputs have relevant 
>> nodes, however the cost is still infinite.
>> -- Root: rel#29:RelSubset#4.ENUMERABLE
>> -- Original rel:
>> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost = 
>> {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
>> -- LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER 
>> (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2, 
>> cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
>> -- LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 
>> 1.2, cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
>> -- BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0, 
>> cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
>> -- BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0, 
>> cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
>> -- 
>> -- Sets:
>> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
>> -- rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
>> -- rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, 
>> unbounded_stream]), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 
>> 1.0 io}
>> -- rel#33:RelSubset#0.ENUMERABLE, best=rel#32
>> -- rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18), 
>> rowcount=1.0, cumulative cost={1.7976931348623157E308 rows, 
>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login, 
>> VARCHAR user_id)
>> -- rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
>> -- rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]), 
>> rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
>> -- rel#36:RelSubset#1.ENUMERABLE, best=rel#35
>> -- rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19), 
>> rowcount=8.0, cumulative cost={1.7976931348623157E308 rows, 
>> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
>> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time, 
>> BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
>> -- rel#21:RelSubset#2.NONE, best=null
>> -- 
>> rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0, 
>> $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
>> -- rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, 
>> $1, $2]), rowcount=1.2, cumulative cost={inf}
>> -- 
>> rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2), 
>> rowcount=1.2, cumulative cost={inf}
>>
>>
>

Re: Beam SQL found limitations

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Piotr,

Thanks for details! I cross-post this to dev@ as well since, I guess, people there can provide more insights on this.

A while ago, I faced the similar issues trying to run Beam SQL against TPC-DS benchmark. 
We had a discussion around that [1], please, take a look since it can be helpful.

[1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b

—
Alexey 

> On 18 May 2023, at 11:36, Wiśniowski Piotr <co...@gmail.com> wrote:
> 
> HI,
> 
> After experimenting with Beam SQL I did find some limitations. Testing on near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct runner and openjdk version "11.0.19". Please let me know if some of them are known/ worked on/ have tickets or have estimated fix time. I believe most of them are low hanging fruits or just my thinking is not right for the problem. If this is the case please guide me to some working solution.
> 
>  From my perspective it is ok to have a fix just on master - no need to wait for release. Priority order: 
> - 7. Windowing function on a stream - in detail - How to get previous message for a key? setting expiration arbitrary big is ok, but access to the previous record must happen fairly quickly not wait for the big window to finish and emit the expired keys. Ideally would like to do it in pure beam pipeline as saving to some external key/value store and then reading this here could potentially result in some race conditions which in I would like to avoid, but if its the only option - let it be.
> - 5. single UNION ALL possible
> - 4. UNNEST ARRAY with nested ROW
> - 3. Using * when there is Row type present in the schema
> - 1. `CROSS JOIN` between two unrelated tables is not supported - even if one is a static number table
> - 2. ROW construction not supported. It is not possible to nest data
> 
> Below queries tat I use to testing this scenarios.
> 
> Thank You for looking at this topics!
> 
> Best
> 
> Wiśniowski Piotr
> 
> -----------------------
> -- 1. `CROSS JOIN` between two unrelated tables is not supported. 
> -----------------------
> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same table.
> -- It is not possible to number rows
> WITH data_table AS (
>     SELECT 1 AS a
> ),
> number_table AS (
>     SELECT 
>         numbers_exploded AS number_item
>     FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS numbers_exploded
> )
> SELECT 
>     data_table.a,
>     number_table.number_item
> FROM data_table
> CROSS JOIN number_table
> ;
> -- CROSS JOIN, JOIN ON FALSE is not supported!
> 
> 
> -----------------------
> -- 2. ROW construction not supported. It is not possible to nest data
> -----------------------
> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0 
> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0 
> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same type
> SELECT MAP['field1','b','field2','a']; -- null
> -- WORKAROUND - manually compose json string, 
> -- drawback - decomposing might be not supported or would need to be also based on string operations
> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS `json_object`;
> 
> 
> -----------------------
> -- 3. Using * when there is Row type present in the schema
> -----------------------
> CREATE EXTERNAL TABLE test_tmp_1(
>     `ref` VARCHAR,
>     `author` ROW<
>         `name` VARCHAR,
>         `email` VARCHAR
>     >
> )
> TYPE text
> LOCATION 'python/dbt/tests/using_star_limitation.jsonl'
> TBLPROPERTIES '{"format":"json", "deadLetterFile":"top/python/dbt/tests/dead"}';
> SELECT * FROM test_tmp_1;
> --  java.lang.NoSuchFieldException: name 
> -- WORKAROUND - refer to columns explicitly with alias
> SELECT 
>     `ref` AS ref_value, 
>     test_tmp_1.`author`.`name` AS author_name, -- table name must be referenced explicitly - this could be fixed too
>     test_tmp_1.`author`.`email` AS author_name
> FROM test_tmp_1;
> 
> 
> -----------------------
> -- 4. UNNEST ARRAY with nested ROW
> -----------------------
> CREATE EXTERNAL TABLE test_tmp(
>     `ref` VARCHAR,
>     `commits` ARRAY<ROW<
>         `id` VARCHAR,
>         `author` ROW<
>             `name` VARCHAR,
>             `email` VARCHAR
>         >
>     >>
> )
> TYPE text
> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
> TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead"}';
> SELECT
>     test_tmp.`ref` AS branch_name,
>     commit_item.`id` AS commit_hash,
>     commit_item.`author`.`name` AS author_name
> FROM test_tmp
> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item;
> -- Row expected 4 fields (Field{name=ref, description=, type=STRING, options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING, author ROW<name STRING, email STRING>> NOT NULL>, options={{}}}, Field{name=id, description=, type=STRING, options={{}}}, Field{name=author, description=, type=ROW<name STRING, email STRING>, options={{}}}). initialized with 5 fields.
> -- limited WORKAROUND - refer to array elements by index and UNION ALL the items into rows
> -- note workaround that uses number table will not work as CROSS JOIN is not supported
> WITH data_parsed AS (
>     SELECT
>         test_tmp.`ref` AS branch_id,
>         test_tmp.commits[1].`id` AS commit_hash,
>         test_tmp.commits[1].`author`.`name` AS author_name
>     FROM test_tmp
>     UNION ALL -- this unfortunately works only for two indexes
>     SELECT
>         test_tmp.`ref` AS branch_id,
>         test_tmp.commits[2].`id` AS commit_hash,
>         test_tmp.commits[2].`author`.`name` AS author_name
>     FROM test_tmp
> )
> SELECT * 
> FROM data_parsed
> WHERE author_name IS NOT NULL
> ;
> -- better WORKAROUND - but tricky to get right (fragile)
> WITH data_with_number_array AS (
>     SELECT
>         test_tmp.`ref` AS branch_name, -- there must be some primary key in the data to join on later due to CROSS JOIN support limitation
>         ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array,
>         CARDINALITY(test_tmp.commits) AS commits_size
>     FROM test_tmp
> ),
> data_with_numbers AS (
>     SELECT 
>         branch_name,
>         `EXPR$0` AS number_item
>     FROM data_with_number_array
>     CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded
>     WHERE `EXPR$0` <= commits_size
> ),
> data_exploded AS (
>     SELECT 
>         test_tmp.`ref` AS branch_name,
>         test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash,
>         test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS author_name
>     FROM test_tmp
>     INNER JOIN data_with_numbers
>         ON data_with_numbers.branch_name = test_tmp.`ref`
> )
> SELECT
>     branch_name,
>     commit_hash,
>     author_name
> FROM data_exploded
> -- WHERE author_name IS NOT NULL - not possible here due to `Non equi-join is not supported`
> -- as it pushes this condition as predicate pushdown to join. 
> -- Is there any way to force checking this condition on here and not to project it upstream?
> ;
> 
> 
> -----------------------
> -- 5. single UNION ALL possible
> -----------------------
> SELECT 1 AS a
> UNION ALL
> SELECT 2 AS a
> UNION ALL
> SELECT 3 AS a;
> -- Wrong number of arguments to BeamUnionRel: org.apache.beam.sdk.values.PCollectionList@70f145ac
> 
> 
> -----------------------
> -- 6. Reserved names
> -----------------------
> -- json_object
> SELECT '{}' AS json_object; 
> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1, column 13.
> -- WORKAROUND SELECT '{}' AS `json_object`
> 
> -----------------------
> -- 7. Windowing function on stream
> -----------------------
> -- in detail - How to get previous message for a key?
> -- setting expiration arbitrary big is ok, but access to the previous record must happen fairly quickly 
> -- not wait for the big window to finish and emit the expired keys. 
> -- Ideally would like to do it in pure beam pipeline as saving to some external key/value store
> -- and then reading this here could potentially result in some race conditions which would be hard to debug.
> DROP TABLE IF EXISTS unbounded_stream;
> CREATE EXTERNAL TABLE unbounded_stream(
>     sequence BIGINT,
>     event_time TIMESTAMP
> )
> TYPE 'sequence'
> TBLPROPERTIES '{"elementsPerSecond":1}'
> ;
> CREATE EXTERNAL TABLE data_1_bounded(
>     `sequence_nb` BIGINT,
>     `sender_login` VARCHAR,
>     `user_id` VARCHAR
> )
> TYPE text
> LOCATION 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
> TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead_letters/dead"}' 
> ;
> WITH
> test_data_1_unbounded AS (
>     SELECT 
>         sender_login,
>         user_id,
>         event_time
>     FROM unbounded_stream
>     INNER JOIN data_1_bounded
>        ON unbounded_stream.sequence = data_1_bounded.sequence_nb
> ),
> test_data_1_lookbehind AS (
>     SELECT 
>         sender_login,
>         LAST_VALUE(user_id) OVER previous_win AS user_id
>     FROM test_data_1_unbounded
>     WINDOW previous_win AS (
>         PARTITION BY sender_login
>         ORDER BY event_time ASC
>         ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
>     )
> )
> SELECT * 
> FROM test_data_1_lookbehind 
> LIMIT 8
> ;
> -- There are not enough rules to produce a node with desired properties: convention=ENUMERABLE. All the inputs have relevant nodes, however the cost is still infinite.
> -- Root: rel#29:RelSubset#4.ENUMERABLE
> -- Original rel:
> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost = {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
> --   LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2, cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
> --     LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2, cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
> --       BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
> --       BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0, cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
> -- 
> -- Sets:
> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
> --         rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
> --                 rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
> --         rel#33:RelSubset#0.ENUMERABLE, best=rel#32
> --                 rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18), rowcount=1.0, cumulative cost={1.7976931348623157E308 rows, 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
> --         rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
> --                 rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]), rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
> --         rel#36:RelSubset#1.ENUMERABLE, best=rel#35
> --                 rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19), rowcount=8.0, cumulative cost={1.7976931348623157E308 rows, 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time, BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
> --         rel#21:RelSubset#2.NONE, best=null
> --                 rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0, $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
> --                 rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1, $2]), rowcount=1.2, cumulative cost={inf}
> --                 rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2), rowcount=1.2, cumulative cost={inf}
> 
> 
> 
> 
> 


Re: Beam SQL found limitations

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Piotr,

Thanks for details! I cross-post this to dev@ as well since, I guess, people there can provide more insights on this.

A while ago, I faced the similar issues trying to run Beam SQL against TPC-DS benchmark. 
We had a discussion around that [1], please, take a look since it can be helpful.

[1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b

—
Alexey 

> On 18 May 2023, at 11:36, Wiśniowski Piotr <co...@gmail.com> wrote:
> 
> HI,
> 
> After experimenting with Beam SQL I did find some limitations. Testing on near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct runner and openjdk version "11.0.19". Please let me know if some of them are known/ worked on/ have tickets or have estimated fix time. I believe most of them are low hanging fruits or just my thinking is not right for the problem. If this is the case please guide me to some working solution.
> 
>  From my perspective it is ok to have a fix just on master - no need to wait for release. Priority order: 
> - 7. Windowing function on a stream - in detail - How to get previous message for a key? setting expiration arbitrary big is ok, but access to the previous record must happen fairly quickly not wait for the big window to finish and emit the expired keys. Ideally would like to do it in pure beam pipeline as saving to some external key/value store and then reading this here could potentially result in some race conditions which in I would like to avoid, but if its the only option - let it be.
> - 5. single UNION ALL possible
> - 4. UNNEST ARRAY with nested ROW
> - 3. Using * when there is Row type present in the schema
> - 1. `CROSS JOIN` between two unrelated tables is not supported - even if one is a static number table
> - 2. ROW construction not supported. It is not possible to nest data
> 
> Below queries tat I use to testing this scenarios.
> 
> Thank You for looking at this topics!
> 
> Best
> 
> Wiśniowski Piotr
> 
> -----------------------
> -- 1. `CROSS JOIN` between two unrelated tables is not supported. 
> -----------------------
> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same table.
> -- It is not possible to number rows
> WITH data_table AS (
>     SELECT 1 AS a
> ),
> number_table AS (
>     SELECT 
>         numbers_exploded AS number_item
>     FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS numbers_exploded
> )
> SELECT 
>     data_table.a,
>     number_table.number_item
> FROM data_table
> CROSS JOIN number_table
> ;
> -- CROSS JOIN, JOIN ON FALSE is not supported!
> 
> 
> -----------------------
> -- 2. ROW construction not supported. It is not possible to nest data
> -----------------------
> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0 
> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0 
> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same type
> SELECT MAP['field1','b','field2','a']; -- null
> -- WORKAROUND - manually compose json string, 
> -- drawback - decomposing might be not supported or would need to be also based on string operations
> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS `json_object`;
> 
> 
> -----------------------
> -- 3. Using * when there is Row type present in the schema
> -----------------------
> CREATE EXTERNAL TABLE test_tmp_1(
>     `ref` VARCHAR,
>     `author` ROW<
>         `name` VARCHAR,
>         `email` VARCHAR
>     >
> )
> TYPE text
> LOCATION 'python/dbt/tests/using_star_limitation.jsonl'
> TBLPROPERTIES '{"format":"json", "deadLetterFile":"top/python/dbt/tests/dead"}';
> SELECT * FROM test_tmp_1;
> --  java.lang.NoSuchFieldException: name 
> -- WORKAROUND - refer to columns explicitly with alias
> SELECT 
>     `ref` AS ref_value, 
>     test_tmp_1.`author`.`name` AS author_name, -- table name must be referenced explicitly - this could be fixed too
>     test_tmp_1.`author`.`email` AS author_name
> FROM test_tmp_1;
> 
> 
> -----------------------
> -- 4. UNNEST ARRAY with nested ROW
> -----------------------
> CREATE EXTERNAL TABLE test_tmp(
>     `ref` VARCHAR,
>     `commits` ARRAY<ROW<
>         `id` VARCHAR,
>         `author` ROW<
>             `name` VARCHAR,
>             `email` VARCHAR
>         >
>     >>
> )
> TYPE text
> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
> TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead"}';
> SELECT
>     test_tmp.`ref` AS branch_name,
>     commit_item.`id` AS commit_hash,
>     commit_item.`author`.`name` AS author_name
> FROM test_tmp
> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item;
> -- Row expected 4 fields (Field{name=ref, description=, type=STRING, options={{}}}, Field{name=commits, description=, type=ARRAY<ROW<id STRING, author ROW<name STRING, email STRING>> NOT NULL>, options={{}}}, Field{name=id, description=, type=STRING, options={{}}}, Field{name=author, description=, type=ROW<name STRING, email STRING>, options={{}}}). initialized with 5 fields.
> -- limited WORKAROUND - refer to array elements by index and UNION ALL the items into rows
> -- note workaround that uses number table will not work as CROSS JOIN is not supported
> WITH data_parsed AS (
>     SELECT
>         test_tmp.`ref` AS branch_id,
>         test_tmp.commits[1].`id` AS commit_hash,
>         test_tmp.commits[1].`author`.`name` AS author_name
>     FROM test_tmp
>     UNION ALL -- this unfortunately works only for two indexes
>     SELECT
>         test_tmp.`ref` AS branch_id,
>         test_tmp.commits[2].`id` AS commit_hash,
>         test_tmp.commits[2].`author`.`name` AS author_name
>     FROM test_tmp
> )
> SELECT * 
> FROM data_parsed
> WHERE author_name IS NOT NULL
> ;
> -- better WORKAROUND - but tricky to get right (fragile)
> WITH data_with_number_array AS (
>     SELECT
>         test_tmp.`ref` AS branch_name, -- there must be some primary key in the data to join on later due to CROSS JOIN support limitation
>         ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] AS number_array,
>         CARDINALITY(test_tmp.commits) AS commits_size
>     FROM test_tmp
> ),
> data_with_numbers AS (
>     SELECT 
>         branch_name,
>         `EXPR$0` AS number_item
>     FROM data_with_number_array
>     CROSS JOIN UNNEST(data_with_number_array.number_array) AS exploded
>     WHERE `EXPR$0` <= commits_size
> ),
> data_exploded AS (
>     SELECT 
>         test_tmp.`ref` AS branch_name,
>         test_tmp.commits[data_with_numbers.number_item].`id` AS commit_hash,
>         test_tmp.commits[data_with_numbers.number_item].`author`.`name` AS author_name
>     FROM test_tmp
>     INNER JOIN data_with_numbers
>         ON data_with_numbers.branch_name = test_tmp.`ref`
> )
> SELECT
>     branch_name,
>     commit_hash,
>     author_name
> FROM data_exploded
> -- WHERE author_name IS NOT NULL - not possible here due to `Non equi-join is not supported`
> -- as it pushes this condition as predicate pushdown to join. 
> -- Is there any way to force checking this condition on here and not to project it upstream?
> ;
> 
> 
> -----------------------
> -- 5. single UNION ALL possible
> -----------------------
> SELECT 1 AS a
> UNION ALL
> SELECT 2 AS a
> UNION ALL
> SELECT 3 AS a;
> -- Wrong number of arguments to BeamUnionRel: org.apache.beam.sdk.values.PCollectionList@70f145ac
> 
> 
> -----------------------
> -- 6. Reserved names
> -----------------------
> -- json_object
> SELECT '{}' AS json_object; 
> -- parse failed: Incorrect syntax near the keyword 'AS' at line 1, column 13.
> -- WORKAROUND SELECT '{}' AS `json_object`
> 
> -----------------------
> -- 7. Windowing function on stream
> -----------------------
> -- in detail - How to get previous message for a key?
> -- setting expiration arbitrary big is ok, but access to the previous record must happen fairly quickly 
> -- not wait for the big window to finish and emit the expired keys. 
> -- Ideally would like to do it in pure beam pipeline as saving to some external key/value store
> -- and then reading this here could potentially result in some race conditions which would be hard to debug.
> DROP TABLE IF EXISTS unbounded_stream;
> CREATE EXTERNAL TABLE unbounded_stream(
>     sequence BIGINT,
>     event_time TIMESTAMP
> )
> TYPE 'sequence'
> TBLPROPERTIES '{"elementsPerSecond":1}'
> ;
> CREATE EXTERNAL TABLE data_1_bounded(
>     `sequence_nb` BIGINT,
>     `sender_login` VARCHAR,
>     `user_id` VARCHAR
> )
> TYPE text
> LOCATION 'python/dbt/tests/sql_functionality_tests/04_get_last_identity/data_1.jsonl'
> TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead_letters/dead"}' 
> ;
> WITH
> test_data_1_unbounded AS (
>     SELECT 
>         sender_login,
>         user_id,
>         event_time
>     FROM unbounded_stream
>     INNER JOIN data_1_bounded
>        ON unbounded_stream.sequence = data_1_bounded.sequence_nb
> ),
> test_data_1_lookbehind AS (
>     SELECT 
>         sender_login,
>         LAST_VALUE(user_id) OVER previous_win AS user_id
>     FROM test_data_1_unbounded
>     WINDOW previous_win AS (
>         PARTITION BY sender_login
>         ORDER BY event_time ASC
>         ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
>     )
> )
> SELECT * 
> FROM test_data_1_lookbehind 
> LIMIT 8
> ;
> -- There are not enough rules to produce a node with desired properties: convention=ENUMERABLE. All the inputs have relevant nodes, however the cost is still infinite.
> -- Root: rel#29:RelSubset#4.ENUMERABLE
> -- Original rel:
> -- LogicalSort(fetch=[8]): rowcount = 1.2, cumulative cost = {12.599999999999998 rows, 35.4 cpu, 9.0 io}, id = 16
> --   LogicalProject(sender_login=[$3], user_id=[LAST_VALUE($4) OVER (PARTITION BY $3 ORDER BY $1 ROWS 1 PRECEDING)]): rowcount = 1.2, cumulative cost = {11.399999999999999 rows, 11.4 cpu, 9.0 io}, id = 14
> --     LogicalJoin(condition=[=($0, $2)], joinType=[inner]): rowcount = 1.2, cumulative cost = {10.2 rows, 9.0 cpu, 9.0 io}, id = 12
> --       BeamIOSourceRel(table=[[beam, unbounded_stream]]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 1.0 io}, id = 1
> --       BeamIOSourceRel(table=[[beam, data_1_bounded]]): rowcount = 8.0, cumulative cost = {8.0 rows, 8.0 cpu, 8.0 io}, id = 3
> -- 
> -- Sets:
> -- Set#0, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time)
> --         rel#18:RelSubset#0.BEAM_LOGICAL, best=rel#1
> --                 rel#1:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, unbounded_stream]), rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 1.0 io}
> --         rel#33:RelSubset#0.ENUMERABLE, best=rel#32
> --                 rel#32:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#18), rowcount=1.0, cumulative cost={1.7976931348623157E308 rows, 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> -- Set#1, type: RecordType(BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
> --         rel#19:RelSubset#1.BEAM_LOGICAL, best=rel#3
> --                 rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, data_1_bounded]), rowcount=8.0, cumulative cost={8.0 rows, 8.0 cpu, 8.0 io}
> --         rel#36:RelSubset#1.ENUMERABLE, best=rel#35
> --                 rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#19), rowcount=8.0, cumulative cost={1.7976931348623157E308 rows, 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> -- Set#2, type: RecordType(BIGINT sequence, TIMESTAMP(6) event_time, BIGINT sequence_nb, VARCHAR sender_login, VARCHAR user_id)
> --         rel#21:RelSubset#2.NONE, best=null
> --                 rel#20:LogicalJoin.NONE(left=RelSubset#18,right=RelSubset#19,condition==($0, $2),joinType=inner), rowcount=1.2, cumulative cost={inf}
> --                 rel#40:LogicalProject.NONE(input=RelSubset#39,exprs=[$3, $4, $0, $1, $2]), rowcount=1.2, cumulative cost={inf}
> --                 rel#65:LogicalCalc.NONE(input=RelSubset#39,expr#0..4={inputs},0=$t3,1=$t4,2=$t0,3=$t1,4=$t2), rowcount=1.2, cumulative cost={inf}
> 
> 
> 
> 
>