You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Talat Uyarer <tu...@paloaltonetworks.com> on 2020/12/08 01:44:13 UTC

About Beam SQL Schema Changes and Code generation

Hi,

We are using Beamsql on our pipeline. Our Data is written in Avro format.
We generate our rows based on our Avro schema. Over time the schema is
changing. I believe Beam SQL generates Java code based on what we define as
BeamSchema while submitting the pipeline. Do you have any idea How can we
handle schema changes with resubmitting our beam job. Is it possible to
generate SQL java code on the fly ?

Thanks

Re: About Beam SQL Schema Changes and Code generation

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Yes Reuven, I would like to write a proposal for that. And also I
like Andrew Pilloud's idea. We can only put necessary fields on Row rest of
them can stay in the unknown field side. We are using Beam Calcite SQL . Is
it ok right ?


On Tue, Dec 8, 2020 at 3:15 PM Reuven Lax <re...@google.com> wrote:

> Talat, are you interested in writing a proposal and sending it to
> dev@beam.apache.org? We could help advise on the options.
>
> Reuven
>
> On Tue, Dec 8, 2020 at 10:28 AM Andrew Pilloud <ap...@google.com>
> wrote:
>
>> We could support EXPECT statements in proposal 2 as long as we restricted
>> it to known fields.
>>
>> We are getting into implementation details now. Making unknown fields
>> just a normal column introduces a number of problems. ZetaSQL doesn't
>> support Map type. All our IOs would need to explicitly deal with that
>> special column. There would be a lack of consistency between the various
>> types (Avro, Proto, Json) which should all support this.
>>
>> We might also want something even more invasive: everything is an unknown
>> field unless it is referenced in the SQL query. All of these options are
>> possible. I guess we need someone who has time to work on it to write a
>> proposal.
>>
>> On Tue, Dec 8, 2020 at 10:03 AM Reuven Lax <re...@google.com> wrote:
>>
>>> I'm not sure that we could support EXCEPT statements, as that would
>>> require introspecting the unknown fields (what if the EXCEPT statement
>>> matches a field that later is added as an unknown field?). IMO this sort of
>>> behavior only makes sense on true pass-through queries. Anything that
>>> modifies the input record would be tricky to support.
>>>
>>> Nested rows would work for proposal 2. You would need to make sure that
>>> the unknown-fields map is recursively added to all nested rows, and you
>>> would do this when you infer a schema from the avro schema.
>>>
>>> On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud <ap...@google.com>
>>> wrote:
>>>
>>>> Proposal 1 would also interact poorly with SELECT * EXCEPT ...
>>>> statements, which returns all columns except specific ones. Adding an
>>>> unknown field does seem like a reasonable way to handle this. It probably
>>>> needs to be something that is native to the Row type, so columns added to
>>>> nested rows also work.
>>>>
>>>> Andrew
>>>>
>>>> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> There's a difference between a fully dynamic schema and simply being
>>>>> able to forward "unknown" fields to the output.
>>>>>
>>>>> A fully-dynamic schema is not really necessary unless we also had
>>>>> dynamic SQL statements. Since the existing SQL statements do not reference
>>>>> the new fields by name, there's no reason to add them to the main schema.
>>>>>
>>>>> However, if you have a SELECT * FROM WHERE XXXX statement that does no
>>>>> aggregation, there's fundamentally no reason we couldn't forward the
>>>>> messages exactly. In theory we could forward the exact bytes that are in
>>>>> the input PCollection, which would necessarily forward the new fields. In
>>>>> practice I believe that we convert the input messages to Beam Row objects
>>>>> in order to evaluate the WHERE clause, and then convert back to Avro to
>>>>> output those messages. I believe this is where we "lose" the unknown
>>>>> messages,but this is an implementation artifact - in theory we could output
>>>>> the original bytes whenever we see a SELECT *. This is not truly a dynamic
>>>>> schema, since you can't really do anything with these extra fields except
>>>>> forward them to your output.
>>>>>
>>>>> I see two possible ways to address this.
>>>>>
>>>>> 1. As I mentioned above, in the case of a SELECT * we could output the
>>>>> original bytes, and only use the Beam Row for evaluating the WHERE clause.
>>>>> This might be very expensive though - we risk having to keep two copies of
>>>>> every message around, one in the original Avro format and one in Row format.
>>>>>
>>>>> 2. The other way would be to do what protocol buffers do. We could add
>>>>> one extra field to the inferred Beam schema to store new, unknown fields
>>>>> (probably this would be a map-valued field). This extra field would simply
>>>>> store the raw bytes of these unknown fields, and then when converting back
>>>>> to Avro they would be added to the output message. This might also add some
>>>>> overhead to the pipeline, so might be best to make this behavior opt in.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bh...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Reuven, could you clarify what you have in mind? I know multiple
>>>>>> times we've discussed the possibility of adding update compatibility
>>>>>> support to SchemaCoder, including support for certain schema changes (field
>>>>>> additions/deletions) - I think the most recent discussion was here [1].
>>>>>>
>>>>>> But it sounds like Talat is asking for something a little beyond
>>>>>> that, effectively a dynamic schema. Is that something you think we can
>>>>>> support?
>>>>>>
>>>>>> [1]
>>>>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7-2540-253Cdev.beam.apache.org-253E&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=1djoltLEJQ6jtY86m9IeTdPEQJxYe7z71jr8apNlCa0&s=eRPi17hG4lAj-GUxi-8IAvcjsWnYeE5pk_hhVVaLdWc&e=>
>>>>>>
>>>>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Thanks. It might be theoretically possible to do this (at least for
>>>>>>> the case where existing fields do not change). Whether anyone currently has
>>>>>>> available time to do this is a different question, but it's something that
>>>>>>> can be looked into.
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>
>>>>>>>> Adding new fields is more common than modifying existing fields.
>>>>>>>> But type change is also possible for existing fields, such as regular
>>>>>>>> mandatory field(string,integer) to union(nullable field). No field
>>>>>>>> deletion.
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>> And when you say schema changes, are these new fields being added
>>>>>>>>> to the schema? Or are you making changes to the existing fields?
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> For sure let me explain a little bit about my pipeline.
>>>>>>>>>> My Pipeline is actually simple
>>>>>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>>>>>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back
>>>>>>>>>> from Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>>>>>>>>
>>>>>>>>>> On our jobs We have three type sqls
>>>>>>>>>> - SELECT * FROM PCOLLECTION
>>>>>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>>>>>>>> - SQL Projection with or without Where clause  SELECT col1, col2
>>>>>>>>>> FROM PCOLLECTION
>>>>>>>>>>
>>>>>>>>>> We know writerSchema for each message. While deserializing avro
>>>>>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes to Beam
>>>>>>>>>> Row step. It always produces a reader schema's generic record and we
>>>>>>>>>> convert that generic record to Row.
>>>>>>>>>> While submitting DF job we use latest schema to generate
>>>>>>>>>> beamSchema.
>>>>>>>>>>
>>>>>>>>>> In the current scenario When we have schema changes first we
>>>>>>>>>> restart all 15k jobs with the latest updated schema then whenever we are
>>>>>>>>>> done we turn on the latest schema for writers. Because of Avro's
>>>>>>>>>> GrammerResolver[1] we read different versions of the schema and we always
>>>>>>>>>> produce the latest schema's record. Without breaking our pipeline we are
>>>>>>>>>> able to handle multiple versions of data in the same streaming pipeline. If
>>>>>>>>>> we can generate SQL's java code when we get notified wirth latest schema we
>>>>>>>>>> will handle all schema changes. The only remaining obstacle is Beam's SQL
>>>>>>>>>> Java code. That's why I am looking for some solution. We dont need multiple
>>>>>>>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>>>>>>>> schema on the fly.
>>>>>>>>>>
>>>>>>>>>> I hope I can explain it :)
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Can you explain the use case some more? Are you wanting to
>>>>>>>>>>> change your SQL statement as well when the schema changes? If not, what are
>>>>>>>>>>> those new fields doing in the pipeline? What I mean is that your old SQL
>>>>>>>>>>> statement clearly didn't reference those fields in a SELECT statement since
>>>>>>>>>>> they didn't exist, so what are you missing by not having them unless you
>>>>>>>>>>> are also changing the SQL statement?
>>>>>>>>>>>
>>>>>>>>>>> Is this a case where you have a SELECT *, and just want to make
>>>>>>>>>>> sure those fields are included?
>>>>>>>>>>>
>>>>>>>>>>> Reuven
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Andrew,
>>>>>>>>>>>>
>>>>>>>>>>>> I assume SQL query is not going to change. Changing things is
>>>>>>>>>>>> the Row schema by adding new columns or rename columns. if we keep a
>>>>>>>>>>>> version information on somewhere for example a KV pair. Key is schema
>>>>>>>>>>>> information, value is Row. Can not we generate SQL code ? Why I am asking
>>>>>>>>>>>> We have 15k pipelines. When we have a schema change we restart a 15k DF job
>>>>>>>>>>>> which is pain. I am looking for a possible way to avoid job restart. Dont
>>>>>>>>>>>> you think it is not still doable ?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <
>>>>>>>>>>>> apilloud@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java
>>>>>>>>>>>>> code on the fly, even if we did, that wouldn't solve your problem. I
>>>>>>>>>>>>> believe our recommended practice is to run both the old and new pipeline
>>>>>>>>>>>>> for some time, then pick a window boundary to transition the output from
>>>>>>>>>>>>> the old pipeline to the new one.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>>>>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>>>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If you worked around this, the Beam model doesn't support
>>>>>>>>>>>>> changing the structure of the pipeline graph. This would significantly
>>>>>>>>>>>>> limit the changes you can make. It would also require some changes to SQL
>>>>>>>>>>>>> to try to produce the same plan for an updated SQL query.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in
>>>>>>>>>>>>>> Avro format. We generate our rows based on our Avro schema. Over time the
>>>>>>>>>>>>>> schema is changing. I believe Beam SQL generates Java code based on what we
>>>>>>>>>>>>>> define as BeamSchema while submitting the pipeline. Do you have any idea
>>>>>>>>>>>>>> How can we handle schema changes with resubmitting our beam job. Is it
>>>>>>>>>>>>>> possible to generate SQL java code on the fly ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Kobe Feng <fl...@gmail.com>.
Talat, my bad, first thing first, to resolve the issue, your proposal would
definitely help the start point for researching schema revolution in beam
pipeline, and I could comment there if any.

Andrew first reply is clear about the intention and scope for apache beam:
static graph for maximum optimization.
I just think both ways are more like compromise which could be done by the
app itself if it converts different formats.

Maybe in the future, we could optionally choose ultimate performance (fix
coder, static sql plan) or ultimate flexibility (schema revolution, plugin
lambda, dynamic routing, etc) or balance them when using SQL ^ ^

Re: About Beam SQL Schema Changes and Code generation

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Kobe cloud you little bit eleborate your idea ?


On Tue, Dec 8, 2020, 6:27 PM Kobe Feng <fl...@gmail.com> wrote:

> Hi all,
> Sorry for the step-in. This case reminds me the similar req. in my company
> for plugin lambda func in beam's pipeline dynamically like filtering,
> selecting, etc. without restarting the job long time ago, like flink
> stateful functions, AKKA, etc.
>
> Generally, SQL defines input, output, and transformation explicit which
> means fix schema and coder usually (using * is arbitrary, nowadays SQL more
> change to newSQL due to NoSQL and decouple with storage layer, loosing the
> restrictions but for more flexible processing capability)
>
> So if we want to support schema-free in streaming pipeline natively, could
> we consider providing such capability from beam core part too (for higher
> transparency and possibly be leveraged by SQL layer too), like the
> capability for plugin coder with runtime compatible check with prev ones,
> stateful functions (not beam's stateful processing), in-out data with
> schema Id for schema-based transform, etc.
>
> I'm kinder of being away from apache beam for a while, sorry if beam
> already had such native support or I misunderstood.
>
> Thanks!
> Kobe Feng
>
> On Tue, Dec 8, 2020 at 3:15 PM Reuven Lax <re...@google.com> wrote:
>
>> Talat, are you interested in writing a proposal and sending it to
>> dev@beam.apache.org? We could help advise on the options.
>>
>> Reuven
>>
>> On Tue, Dec 8, 2020 at 10:28 AM Andrew Pilloud <ap...@google.com>
>> wrote:
>>
>>> We could support EXPECT statements in proposal 2 as long as we
>>> restricted it to known fields.
>>>
>>> We are getting into implementation details now. Making unknown fields
>>> just a normal column introduces a number of problems. ZetaSQL doesn't
>>> support Map type. All our IOs would need to explicitly deal with that
>>> special column. There would be a lack of consistency between the various
>>> types (Avro, Proto, Json) which should all support this.
>>>
>>> We might also want something even more invasive: everything is an
>>> unknown field unless it is referenced in the SQL query. All of these
>>> options are possible. I guess we need someone who has time to work on it to
>>> write a proposal.
>>>
>>> On Tue, Dec 8, 2020 at 10:03 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I'm not sure that we could support EXCEPT statements, as that would
>>>> require introspecting the unknown fields (what if the EXCEPT statement
>>>> matches a field that later is added as an unknown field?). IMO this sort of
>>>> behavior only makes sense on true pass-through queries. Anything that
>>>> modifies the input record would be tricky to support.
>>>>
>>>> Nested rows would work for proposal 2. You would need to make sure that
>>>> the unknown-fields map is recursively added to all nested rows, and you
>>>> would do this when you infer a schema from the avro schema.
>>>>
>>>> On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud <ap...@google.com>
>>>> wrote:
>>>>
>>>>> Proposal 1 would also interact poorly with SELECT * EXCEPT ...
>>>>> statements, which returns all columns except specific ones. Adding an
>>>>> unknown field does seem like a reasonable way to handle this. It probably
>>>>> needs to be something that is native to the Row type, so columns added to
>>>>> nested rows also work.
>>>>>
>>>>> Andrew
>>>>>
>>>>> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> There's a difference between a fully dynamic schema and simply being
>>>>>> able to forward "unknown" fields to the output.
>>>>>>
>>>>>> A fully-dynamic schema is not really necessary unless we also had
>>>>>> dynamic SQL statements. Since the existing SQL statements do not reference
>>>>>> the new fields by name, there's no reason to add them to the main schema.
>>>>>>
>>>>>> However, if you have a SELECT * FROM WHERE XXXX statement that does
>>>>>> no aggregation, there's fundamentally no reason we couldn't forward the
>>>>>> messages exactly. In theory we could forward the exact bytes that are in
>>>>>> the input PCollection, which would necessarily forward the new fields. In
>>>>>> practice I believe that we convert the input messages to Beam Row objects
>>>>>> in order to evaluate the WHERE clause, and then convert back to Avro to
>>>>>> output those messages. I believe this is where we "lose" the unknown
>>>>>> messages,but this is an implementation artifact - in theory we could output
>>>>>> the original bytes whenever we see a SELECT *. This is not truly a dynamic
>>>>>> schema, since you can't really do anything with these extra fields except
>>>>>> forward them to your output.
>>>>>>
>>>>>> I see two possible ways to address this.
>>>>>>
>>>>>> 1. As I mentioned above, in the case of a SELECT * we could output
>>>>>> the original bytes, and only use the Beam Row for evaluating the WHERE
>>>>>> clause. This might be very expensive though - we risk having to keep two
>>>>>> copies of every message around, one in the original Avro format and one in
>>>>>> Row format.
>>>>>>
>>>>>> 2. The other way would be to do what protocol buffers do. We could
>>>>>> add one extra field to the inferred Beam schema to store new, unknown
>>>>>> fields (probably this would be a map-valued field). This extra field would
>>>>>> simply store the raw bytes of these unknown fields, and then when
>>>>>> converting back to Avro they would be added to the output message. This
>>>>>> might also add some overhead to the pipeline, so might be best to make this
>>>>>> behavior opt in.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bh...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Reuven, could you clarify what you have in mind? I know multiple
>>>>>>> times we've discussed the possibility of adding update compatibility
>>>>>>> support to SchemaCoder, including support for certain schema changes (field
>>>>>>> additions/deletions) - I think the most recent discussion was here [1].
>>>>>>>
>>>>>>> But it sounds like Talat is asking for something a little beyond
>>>>>>> that, effectively a dynamic schema. Is that something you think we can
>>>>>>> support?
>>>>>>>
>>>>>>> [1]
>>>>>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7-2540-253Cdev.beam.apache.org-253E&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Xl4R9N-8xXkH0eYS8Y49EQoBUaQSTRtv7sBjo9XRAOk&s=9wy_ZugJkaLoCzvqO7OVL4LjVLi0WcdWDCEjXEhcn6M&e=>
>>>>>>>
>>>>>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Thanks. It might be theoretically possible to do this (at least for
>>>>>>>> the case where existing fields do not change). Whether anyone currently has
>>>>>>>> available time to do this is a different question, but it's something that
>>>>>>>> can be looked into.
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>
>>>>>>>>> Adding new fields is more common than modifying existing fields.
>>>>>>>>> But type change is also possible for existing fields, such as regular
>>>>>>>>> mandatory field(string,integer) to union(nullable field). No field
>>>>>>>>> deletion.
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> And when you say schema changes, are these new fields being added
>>>>>>>>>> to the schema? Or are you making changes to the existing fields?
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>> For sure let me explain a little bit about my pipeline.
>>>>>>>>>>> My Pipeline is actually simple
>>>>>>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[],
>>>>>>>>>>> byte[]>, Row>) -> Apply Filter(SqlTransform.query(sql)) ->
>>>>>>>>>>> Convert back from Row to Avro (DoFn<Row, byte[]>)-> Write
>>>>>>>>>>> DB/GCS/GRPC etc
>>>>>>>>>>>
>>>>>>>>>>> On our jobs We have three type sqls
>>>>>>>>>>> - SELECT * FROM PCOLLECTION
>>>>>>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>>>>>>>>> - SQL Projection with or without Where clause  SELECT col1, col2
>>>>>>>>>>> FROM PCOLLECTION
>>>>>>>>>>>
>>>>>>>>>>> We know writerSchema for each message. While deserializing avro
>>>>>>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes to Beam
>>>>>>>>>>> Row step. It always produces a reader schema's generic record and we
>>>>>>>>>>> convert that generic record to Row.
>>>>>>>>>>> While submitting DF job we use latest schema to generate
>>>>>>>>>>> beamSchema.
>>>>>>>>>>>
>>>>>>>>>>> In the current scenario When we have schema changes first we
>>>>>>>>>>> restart all 15k jobs with the latest updated schema then whenever we are
>>>>>>>>>>> done we turn on the latest schema for writers. Because of Avro's
>>>>>>>>>>> GrammerResolver[1] we read different versions of the schema and we always
>>>>>>>>>>> produce the latest schema's record. Without breaking our pipeline we are
>>>>>>>>>>> able to handle multiple versions of data in the same streaming pipeline. If
>>>>>>>>>>> we can generate SQL's java code when we get notified wirth latest schema we
>>>>>>>>>>> will handle all schema changes. The only remaining obstacle is Beam's SQL
>>>>>>>>>>> Java code. That's why I am looking for some solution. We dont need multiple
>>>>>>>>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>>>>>>>>> schema on the fly.
>>>>>>>>>>>
>>>>>>>>>>> I hope I can explain it :)
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Can you explain the use case some more? Are you wanting to
>>>>>>>>>>>> change your SQL statement as well when the schema changes? If not, what are
>>>>>>>>>>>> those new fields doing in the pipeline? What I mean is that your old SQL
>>>>>>>>>>>> statement clearly didn't reference those fields in a SELECT statement since
>>>>>>>>>>>> they didn't exist, so what are you missing by not having them unless you
>>>>>>>>>>>> are also changing the SQL statement?
>>>>>>>>>>>>
>>>>>>>>>>>> Is this a case where you have a SELECT *, and just want to make
>>>>>>>>>>>> sure those fields are included?
>>>>>>>>>>>>
>>>>>>>>>>>> Reuven
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Andrew,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I assume SQL query is not going to change. Changing things is
>>>>>>>>>>>>> the Row schema by adding new columns or rename columns. if we keep a
>>>>>>>>>>>>> version information on somewhere for example a KV pair. Key is schema
>>>>>>>>>>>>> information, value is Row. Can not we generate SQL code ? Why I am asking
>>>>>>>>>>>>> We have 15k pipelines. When we have a schema change we restart a 15k DF job
>>>>>>>>>>>>> which is pain. I am looking for a possible way to avoid job restart. Dont
>>>>>>>>>>>>> you think it is not still doable ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <
>>>>>>>>>>>>> apilloud@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java
>>>>>>>>>>>>>> code on the fly, even if we did, that wouldn't solve your problem. I
>>>>>>>>>>>>>> believe our recommended practice is to run both the old and new pipeline
>>>>>>>>>>>>>> for some time, then pick a window boundary to transition the output from
>>>>>>>>>>>>>> the old pipeline to the new one.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>>>>>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>>>>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If you worked around this, the Beam model doesn't support
>>>>>>>>>>>>>> changing the structure of the pipeline graph. This would significantly
>>>>>>>>>>>>>> limit the changes you can make. It would also require some changes to SQL
>>>>>>>>>>>>>> to try to produce the same plan for an updated SQL query.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in
>>>>>>>>>>>>>>> Avro format. We generate our rows based on our Avro schema. Over time the
>>>>>>>>>>>>>>> schema is changing. I believe Beam SQL generates Java code based on what we
>>>>>>>>>>>>>>> define as BeamSchema while submitting the pipeline. Do you have any idea
>>>>>>>>>>>>>>> How can we handle schema changes with resubmitting our beam job. Is it
>>>>>>>>>>>>>>> possible to generate SQL java code on the fly ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>
> --
> Yours Sincerely
> Kobe Feng
>

Re: About Beam SQL Schema Changes and Code generation

Posted by Kobe Feng <fl...@gmail.com>.
Hi all,
Sorry for the step-in. This case reminds me the similar req. in my company
for plugin lambda func in beam's pipeline dynamically like filtering,
selecting, etc. without restarting the job long time ago, like flink
stateful functions, AKKA, etc.

Generally, SQL defines input, output, and transformation explicit which
means fix schema and coder usually (using * is arbitrary, nowadays SQL more
change to newSQL due to NoSQL and decouple with storage layer, loosing the
restrictions but for more flexible processing capability)

So if we want to support schema-free in streaming pipeline natively, could
we consider providing such capability from beam core part too (for higher
transparency and possibly be leveraged by SQL layer too), like the
capability for plugin coder with runtime compatible check with prev ones,
stateful functions (not beam's stateful processing), in-out data with
schema Id for schema-based transform, etc.

I'm kinder of being away from apache beam for a while, sorry if beam
already had such native support or I misunderstood.

Thanks!
Kobe Feng

On Tue, Dec 8, 2020 at 3:15 PM Reuven Lax <re...@google.com> wrote:

> Talat, are you interested in writing a proposal and sending it to
> dev@beam.apache.org? We could help advise on the options.
>
> Reuven
>
> On Tue, Dec 8, 2020 at 10:28 AM Andrew Pilloud <ap...@google.com>
> wrote:
>
>> We could support EXPECT statements in proposal 2 as long as we restricted
>> it to known fields.
>>
>> We are getting into implementation details now. Making unknown fields
>> just a normal column introduces a number of problems. ZetaSQL doesn't
>> support Map type. All our IOs would need to explicitly deal with that
>> special column. There would be a lack of consistency between the various
>> types (Avro, Proto, Json) which should all support this.
>>
>> We might also want something even more invasive: everything is an unknown
>> field unless it is referenced in the SQL query. All of these options are
>> possible. I guess we need someone who has time to work on it to write a
>> proposal.
>>
>> On Tue, Dec 8, 2020 at 10:03 AM Reuven Lax <re...@google.com> wrote:
>>
>>> I'm not sure that we could support EXCEPT statements, as that would
>>> require introspecting the unknown fields (what if the EXCEPT statement
>>> matches a field that later is added as an unknown field?). IMO this sort of
>>> behavior only makes sense on true pass-through queries. Anything that
>>> modifies the input record would be tricky to support.
>>>
>>> Nested rows would work for proposal 2. You would need to make sure that
>>> the unknown-fields map is recursively added to all nested rows, and you
>>> would do this when you infer a schema from the avro schema.
>>>
>>> On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud <ap...@google.com>
>>> wrote:
>>>
>>>> Proposal 1 would also interact poorly with SELECT * EXCEPT ...
>>>> statements, which returns all columns except specific ones. Adding an
>>>> unknown field does seem like a reasonable way to handle this. It probably
>>>> needs to be something that is native to the Row type, so columns added to
>>>> nested rows also work.
>>>>
>>>> Andrew
>>>>
>>>> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> There's a difference between a fully dynamic schema and simply being
>>>>> able to forward "unknown" fields to the output.
>>>>>
>>>>> A fully-dynamic schema is not really necessary unless we also had
>>>>> dynamic SQL statements. Since the existing SQL statements do not reference
>>>>> the new fields by name, there's no reason to add them to the main schema.
>>>>>
>>>>> However, if you have a SELECT * FROM WHERE XXXX statement that does no
>>>>> aggregation, there's fundamentally no reason we couldn't forward the
>>>>> messages exactly. In theory we could forward the exact bytes that are in
>>>>> the input PCollection, which would necessarily forward the new fields. In
>>>>> practice I believe that we convert the input messages to Beam Row objects
>>>>> in order to evaluate the WHERE clause, and then convert back to Avro to
>>>>> output those messages. I believe this is where we "lose" the unknown
>>>>> messages,but this is an implementation artifact - in theory we could output
>>>>> the original bytes whenever we see a SELECT *. This is not truly a dynamic
>>>>> schema, since you can't really do anything with these extra fields except
>>>>> forward them to your output.
>>>>>
>>>>> I see two possible ways to address this.
>>>>>
>>>>> 1. As I mentioned above, in the case of a SELECT * we could output the
>>>>> original bytes, and only use the Beam Row for evaluating the WHERE clause.
>>>>> This might be very expensive though - we risk having to keep two copies of
>>>>> every message around, one in the original Avro format and one in Row format.
>>>>>
>>>>> 2. The other way would be to do what protocol buffers do. We could add
>>>>> one extra field to the inferred Beam schema to store new, unknown fields
>>>>> (probably this would be a map-valued field). This extra field would simply
>>>>> store the raw bytes of these unknown fields, and then when converting back
>>>>> to Avro they would be added to the output message. This might also add some
>>>>> overhead to the pipeline, so might be best to make this behavior opt in.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bh...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Reuven, could you clarify what you have in mind? I know multiple
>>>>>> times we've discussed the possibility of adding update compatibility
>>>>>> support to SchemaCoder, including support for certain schema changes (field
>>>>>> additions/deletions) - I think the most recent discussion was here [1].
>>>>>>
>>>>>> But it sounds like Talat is asking for something a little beyond
>>>>>> that, effectively a dynamic schema. Is that something you think we can
>>>>>> support?
>>>>>>
>>>>>> [1]
>>>>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>>>>>
>>>>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Thanks. It might be theoretically possible to do this (at least for
>>>>>>> the case where existing fields do not change). Whether anyone currently has
>>>>>>> available time to do this is a different question, but it's something that
>>>>>>> can be looked into.
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>
>>>>>>>> Adding new fields is more common than modifying existing fields.
>>>>>>>> But type change is also possible for existing fields, such as regular
>>>>>>>> mandatory field(string,integer) to union(nullable field). No field
>>>>>>>> deletion.
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>> And when you say schema changes, are these new fields being added
>>>>>>>>> to the schema? Or are you making changes to the existing fields?
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> For sure let me explain a little bit about my pipeline.
>>>>>>>>>> My Pipeline is actually simple
>>>>>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>>>>>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back
>>>>>>>>>> from Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>>>>>>>>
>>>>>>>>>> On our jobs We have three type sqls
>>>>>>>>>> - SELECT * FROM PCOLLECTION
>>>>>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>>>>>>>> - SQL Projection with or without Where clause  SELECT col1, col2
>>>>>>>>>> FROM PCOLLECTION
>>>>>>>>>>
>>>>>>>>>> We know writerSchema for each message. While deserializing avro
>>>>>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes to Beam
>>>>>>>>>> Row step. It always produces a reader schema's generic record and we
>>>>>>>>>> convert that generic record to Row.
>>>>>>>>>> While submitting DF job we use latest schema to generate
>>>>>>>>>> beamSchema.
>>>>>>>>>>
>>>>>>>>>> In the current scenario When we have schema changes first we
>>>>>>>>>> restart all 15k jobs with the latest updated schema then whenever we are
>>>>>>>>>> done we turn on the latest schema for writers. Because of Avro's
>>>>>>>>>> GrammerResolver[1] we read different versions of the schema and we always
>>>>>>>>>> produce the latest schema's record. Without breaking our pipeline we are
>>>>>>>>>> able to handle multiple versions of data in the same streaming pipeline. If
>>>>>>>>>> we can generate SQL's java code when we get notified wirth latest schema we
>>>>>>>>>> will handle all schema changes. The only remaining obstacle is Beam's SQL
>>>>>>>>>> Java code. That's why I am looking for some solution. We dont need multiple
>>>>>>>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>>>>>>>> schema on the fly.
>>>>>>>>>>
>>>>>>>>>> I hope I can explain it :)
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Can you explain the use case some more? Are you wanting to
>>>>>>>>>>> change your SQL statement as well when the schema changes? If not, what are
>>>>>>>>>>> those new fields doing in the pipeline? What I mean is that your old SQL
>>>>>>>>>>> statement clearly didn't reference those fields in a SELECT statement since
>>>>>>>>>>> they didn't exist, so what are you missing by not having them unless you
>>>>>>>>>>> are also changing the SQL statement?
>>>>>>>>>>>
>>>>>>>>>>> Is this a case where you have a SELECT *, and just want to make
>>>>>>>>>>> sure those fields are included?
>>>>>>>>>>>
>>>>>>>>>>> Reuven
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Andrew,
>>>>>>>>>>>>
>>>>>>>>>>>> I assume SQL query is not going to change. Changing things is
>>>>>>>>>>>> the Row schema by adding new columns or rename columns. if we keep a
>>>>>>>>>>>> version information on somewhere for example a KV pair. Key is schema
>>>>>>>>>>>> information, value is Row. Can not we generate SQL code ? Why I am asking
>>>>>>>>>>>> We have 15k pipelines. When we have a schema change we restart a 15k DF job
>>>>>>>>>>>> which is pain. I am looking for a possible way to avoid job restart. Dont
>>>>>>>>>>>> you think it is not still doable ?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <
>>>>>>>>>>>> apilloud@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java
>>>>>>>>>>>>> code on the fly, even if we did, that wouldn't solve your problem. I
>>>>>>>>>>>>> believe our recommended practice is to run both the old and new pipeline
>>>>>>>>>>>>> for some time, then pick a window boundary to transition the output from
>>>>>>>>>>>>> the old pipeline to the new one.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>>>>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>>>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If you worked around this, the Beam model doesn't support
>>>>>>>>>>>>> changing the structure of the pipeline graph. This would significantly
>>>>>>>>>>>>> limit the changes you can make. It would also require some changes to SQL
>>>>>>>>>>>>> to try to produce the same plan for an updated SQL query.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in
>>>>>>>>>>>>>> Avro format. We generate our rows based on our Avro schema. Over time the
>>>>>>>>>>>>>> schema is changing. I believe Beam SQL generates Java code based on what we
>>>>>>>>>>>>>> define as BeamSchema while submitting the pipeline. Do you have any idea
>>>>>>>>>>>>>> How can we handle schema changes with resubmitting our beam job. Is it
>>>>>>>>>>>>>> possible to generate SQL java code on the fly ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

-- 
Yours Sincerely
Kobe Feng

Re: About Beam SQL Schema Changes and Code generation

Posted by Reuven Lax <re...@google.com>.
Talat, are you interested in writing a proposal and sending it to
dev@beam.apache.org? We could help advise on the options.

Reuven

On Tue, Dec 8, 2020 at 10:28 AM Andrew Pilloud <ap...@google.com> wrote:

> We could support EXPECT statements in proposal 2 as long as we restricted
> it to known fields.
>
> We are getting into implementation details now. Making unknown fields just
> a normal column introduces a number of problems. ZetaSQL doesn't support
> Map type. All our IOs would need to explicitly deal with that special
> column. There would be a lack of consistency between the various types
> (Avro, Proto, Json) which should all support this.
>
> We might also want something even more invasive: everything is an unknown
> field unless it is referenced in the SQL query. All of these options are
> possible. I guess we need someone who has time to work on it to write a
> proposal.
>
> On Tue, Dec 8, 2020 at 10:03 AM Reuven Lax <re...@google.com> wrote:
>
>> I'm not sure that we could support EXCEPT statements, as that would
>> require introspecting the unknown fields (what if the EXCEPT statement
>> matches a field that later is added as an unknown field?). IMO this sort of
>> behavior only makes sense on true pass-through queries. Anything that
>> modifies the input record would be tricky to support.
>>
>> Nested rows would work for proposal 2. You would need to make sure that
>> the unknown-fields map is recursively added to all nested rows, and you
>> would do this when you infer a schema from the avro schema.
>>
>> On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud <ap...@google.com>
>> wrote:
>>
>>> Proposal 1 would also interact poorly with SELECT * EXCEPT ...
>>> statements, which returns all columns except specific ones. Adding an
>>> unknown field does seem like a reasonable way to handle this. It probably
>>> needs to be something that is native to the Row type, so columns added to
>>> nested rows also work.
>>>
>>> Andrew
>>>
>>> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> There's a difference between a fully dynamic schema and simply being
>>>> able to forward "unknown" fields to the output.
>>>>
>>>> A fully-dynamic schema is not really necessary unless we also had
>>>> dynamic SQL statements. Since the existing SQL statements do not reference
>>>> the new fields by name, there's no reason to add them to the main schema.
>>>>
>>>> However, if you have a SELECT * FROM WHERE XXXX statement that does no
>>>> aggregation, there's fundamentally no reason we couldn't forward the
>>>> messages exactly. In theory we could forward the exact bytes that are in
>>>> the input PCollection, which would necessarily forward the new fields. In
>>>> practice I believe that we convert the input messages to Beam Row objects
>>>> in order to evaluate the WHERE clause, and then convert back to Avro to
>>>> output those messages. I believe this is where we "lose" the unknown
>>>> messages,but this is an implementation artifact - in theory we could output
>>>> the original bytes whenever we see a SELECT *. This is not truly a dynamic
>>>> schema, since you can't really do anything with these extra fields except
>>>> forward them to your output.
>>>>
>>>> I see two possible ways to address this.
>>>>
>>>> 1. As I mentioned above, in the case of a SELECT * we could output the
>>>> original bytes, and only use the Beam Row for evaluating the WHERE clause.
>>>> This might be very expensive though - we risk having to keep two copies of
>>>> every message around, one in the original Avro format and one in Row format.
>>>>
>>>> 2. The other way would be to do what protocol buffers do. We could add
>>>> one extra field to the inferred Beam schema to store new, unknown fields
>>>> (probably this would be a map-valued field). This extra field would simply
>>>> store the raw bytes of these unknown fields, and then when converting back
>>>> to Avro they would be added to the output message. This might also add some
>>>> overhead to the pipeline, so might be best to make this behavior opt in.
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bh...@google.com>
>>>> wrote:
>>>>
>>>>> Reuven, could you clarify what you have in mind? I know multiple times
>>>>> we've discussed the possibility of adding update compatibility support to
>>>>> SchemaCoder, including support for certain schema changes (field
>>>>> additions/deletions) - I think the most recent discussion was here [1].
>>>>>
>>>>> But it sounds like Talat is asking for something a little beyond that,
>>>>> effectively a dynamic schema. Is that something you think we can support?
>>>>>
>>>>> [1]
>>>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>>>>
>>>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Thanks. It might be theoretically possible to do this (at least for
>>>>>> the case where existing fields do not change). Whether anyone currently has
>>>>>> available time to do this is a different question, but it's something that
>>>>>> can be looked into.
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>
>>>>>>> Adding new fields is more common than modifying existing fields. But
>>>>>>> type change is also possible for existing fields, such as regular mandatory
>>>>>>> field(string,integer) to union(nullable field). No field deletion.
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> And when you say schema changes, are these new fields being added
>>>>>>>> to the schema? Or are you making changes to the existing fields?
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> For sure let me explain a little bit about my pipeline.
>>>>>>>>> My Pipeline is actually simple
>>>>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>>>>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back
>>>>>>>>> from Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>>>>>>>
>>>>>>>>> On our jobs We have three type sqls
>>>>>>>>> - SELECT * FROM PCOLLECTION
>>>>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>>>>>>> - SQL Projection with or without Where clause  SELECT col1, col2
>>>>>>>>> FROM PCOLLECTION
>>>>>>>>>
>>>>>>>>> We know writerSchema for each message. While deserializing avro
>>>>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes to Beam
>>>>>>>>> Row step. It always produces a reader schema's generic record and we
>>>>>>>>> convert that generic record to Row.
>>>>>>>>> While submitting DF job we use latest schema to generate
>>>>>>>>> beamSchema.
>>>>>>>>>
>>>>>>>>> In the current scenario When we have schema changes first we
>>>>>>>>> restart all 15k jobs with the latest updated schema then whenever we are
>>>>>>>>> done we turn on the latest schema for writers. Because of Avro's
>>>>>>>>> GrammerResolver[1] we read different versions of the schema and we always
>>>>>>>>> produce the latest schema's record. Without breaking our pipeline we are
>>>>>>>>> able to handle multiple versions of data in the same streaming pipeline. If
>>>>>>>>> we can generate SQL's java code when we get notified wirth latest schema we
>>>>>>>>> will handle all schema changes. The only remaining obstacle is Beam's SQL
>>>>>>>>> Java code. That's why I am looking for some solution. We dont need multiple
>>>>>>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>>>>>>> schema on the fly.
>>>>>>>>>
>>>>>>>>> I hope I can explain it :)
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Can you explain the use case some more? Are you wanting to change
>>>>>>>>>> your SQL statement as well when the schema changes? If not, what are those
>>>>>>>>>> new fields doing in the pipeline? What I mean is that your old SQL
>>>>>>>>>> statement clearly didn't reference those fields in a SELECT statement since
>>>>>>>>>> they didn't exist, so what are you missing by not having them unless you
>>>>>>>>>> are also changing the SQL statement?
>>>>>>>>>>
>>>>>>>>>> Is this a case where you have a SELECT *, and just want to make
>>>>>>>>>> sure those fields are included?
>>>>>>>>>>
>>>>>>>>>> Reuven
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Andrew,
>>>>>>>>>>>
>>>>>>>>>>> I assume SQL query is not going to change. Changing things is
>>>>>>>>>>> the Row schema by adding new columns or rename columns. if we keep a
>>>>>>>>>>> version information on somewhere for example a KV pair. Key is schema
>>>>>>>>>>> information, value is Row. Can not we generate SQL code ? Why I am asking
>>>>>>>>>>> We have 15k pipelines. When we have a schema change we restart a 15k DF job
>>>>>>>>>>> which is pain. I am looking for a possible way to avoid job restart. Dont
>>>>>>>>>>> you think it is not still doable ?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <
>>>>>>>>>>> apilloud@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java code
>>>>>>>>>>>> on the fly, even if we did, that wouldn't solve your problem. I believe our
>>>>>>>>>>>> recommended practice is to run both the old and new pipeline for some time,
>>>>>>>>>>>> then pick a window boundary to transition the output from the old pipeline
>>>>>>>>>>>> to the new one.
>>>>>>>>>>>>
>>>>>>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>>>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>>>>>>
>>>>>>>>>>>> If you worked around this, the Beam model doesn't support
>>>>>>>>>>>> changing the structure of the pipeline graph. This would significantly
>>>>>>>>>>>> limit the changes you can make. It would also require some changes to SQL
>>>>>>>>>>>> to try to produce the same plan for an updated SQL query.
>>>>>>>>>>>>
>>>>>>>>>>>> Andrew
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in
>>>>>>>>>>>>> Avro format. We generate our rows based on our Avro schema. Over time the
>>>>>>>>>>>>> schema is changing. I believe Beam SQL generates Java code based on what we
>>>>>>>>>>>>> define as BeamSchema while submitting the pipeline. Do you have any idea
>>>>>>>>>>>>> How can we handle schema changes with resubmitting our beam job. Is it
>>>>>>>>>>>>> possible to generate SQL java code on the fly ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Andrew Pilloud <ap...@google.com>.
We could support EXPECT statements in proposal 2 as long as we restricted
it to known fields.

We are getting into implementation details now. Making unknown fields just
a normal column introduces a number of problems. ZetaSQL doesn't support
Map type. All our IOs would need to explicitly deal with that special
column. There would be a lack of consistency between the various types
(Avro, Proto, Json) which should all support this.

We might also want something even more invasive: everything is an unknown
field unless it is referenced in the SQL query. All of these options are
possible. I guess we need someone who has time to work on it to write a
proposal.

On Tue, Dec 8, 2020 at 10:03 AM Reuven Lax <re...@google.com> wrote:

> I'm not sure that we could support EXCEPT statements, as that would
> require introspecting the unknown fields (what if the EXCEPT statement
> matches a field that later is added as an unknown field?). IMO this sort of
> behavior only makes sense on true pass-through queries. Anything that
> modifies the input record would be tricky to support.
>
> Nested rows would work for proposal 2. You would need to make sure that
> the unknown-fields map is recursively added to all nested rows, and you
> would do this when you infer a schema from the avro schema.
>
> On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud <ap...@google.com> wrote:
>
>> Proposal 1 would also interact poorly with SELECT * EXCEPT ...
>> statements, which returns all columns except specific ones. Adding an
>> unknown field does seem like a reasonable way to handle this. It probably
>> needs to be something that is native to the Row type, so columns added to
>> nested rows also work.
>>
>> Andrew
>>
>> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote:
>>
>>> There's a difference between a fully dynamic schema and simply being
>>> able to forward "unknown" fields to the output.
>>>
>>> A fully-dynamic schema is not really necessary unless we also had
>>> dynamic SQL statements. Since the existing SQL statements do not reference
>>> the new fields by name, there's no reason to add them to the main schema.
>>>
>>> However, if you have a SELECT * FROM WHERE XXXX statement that does no
>>> aggregation, there's fundamentally no reason we couldn't forward the
>>> messages exactly. In theory we could forward the exact bytes that are in
>>> the input PCollection, which would necessarily forward the new fields. In
>>> practice I believe that we convert the input messages to Beam Row objects
>>> in order to evaluate the WHERE clause, and then convert back to Avro to
>>> output those messages. I believe this is where we "lose" the unknown
>>> messages,but this is an implementation artifact - in theory we could output
>>> the original bytes whenever we see a SELECT *. This is not truly a dynamic
>>> schema, since you can't really do anything with these extra fields except
>>> forward them to your output.
>>>
>>> I see two possible ways to address this.
>>>
>>> 1. As I mentioned above, in the case of a SELECT * we could output the
>>> original bytes, and only use the Beam Row for evaluating the WHERE clause.
>>> This might be very expensive though - we risk having to keep two copies of
>>> every message around, one in the original Avro format and one in Row format.
>>>
>>> 2. The other way would be to do what protocol buffers do. We could add
>>> one extra field to the inferred Beam schema to store new, unknown fields
>>> (probably this would be a map-valued field). This extra field would simply
>>> store the raw bytes of these unknown fields, and then when converting back
>>> to Avro they would be added to the output message. This might also add some
>>> overhead to the pipeline, so might be best to make this behavior opt in.
>>>
>>> Reuven
>>>
>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bh...@google.com>
>>> wrote:
>>>
>>>> Reuven, could you clarify what you have in mind? I know multiple times
>>>> we've discussed the possibility of adding update compatibility support to
>>>> SchemaCoder, including support for certain schema changes (field
>>>> additions/deletions) - I think the most recent discussion was here [1].
>>>>
>>>> But it sounds like Talat is asking for something a little beyond that,
>>>> effectively a dynamic schema. Is that something you think we can support?
>>>>
>>>> [1]
>>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>>>
>>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Thanks. It might be theoretically possible to do this (at least for
>>>>> the case where existing fields do not change). Whether anyone currently has
>>>>> available time to do this is a different question, but it's something that
>>>>> can be looked into.
>>>>>
>>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Adding new fields is more common than modifying existing fields. But
>>>>>> type change is also possible for existing fields, such as regular mandatory
>>>>>> field(string,integer) to union(nullable field). No field deletion.
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> And when you say schema changes, are these new fields being added to
>>>>>>> the schema? Or are you making changes to the existing fields?
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> For sure let me explain a little bit about my pipeline.
>>>>>>>> My Pipeline is actually simple
>>>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>>>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back
>>>>>>>> from Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>>>>>>
>>>>>>>> On our jobs We have three type sqls
>>>>>>>> - SELECT * FROM PCOLLECTION
>>>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>>>>>> - SQL Projection with or without Where clause  SELECT col1, col2
>>>>>>>> FROM PCOLLECTION
>>>>>>>>
>>>>>>>> We know writerSchema for each message. While deserializing avro
>>>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes to Beam
>>>>>>>> Row step. It always produces a reader schema's generic record and we
>>>>>>>> convert that generic record to Row.
>>>>>>>> While submitting DF job we use latest schema to generate beamSchema.
>>>>>>>>
>>>>>>>> In the current scenario When we have schema changes first we
>>>>>>>> restart all 15k jobs with the latest updated schema then whenever we are
>>>>>>>> done we turn on the latest schema for writers. Because of Avro's
>>>>>>>> GrammerResolver[1] we read different versions of the schema and we always
>>>>>>>> produce the latest schema's record. Without breaking our pipeline we are
>>>>>>>> able to handle multiple versions of data in the same streaming pipeline. If
>>>>>>>> we can generate SQL's java code when we get notified wirth latest schema we
>>>>>>>> will handle all schema changes. The only remaining obstacle is Beam's SQL
>>>>>>>> Java code. That's why I am looking for some solution. We dont need multiple
>>>>>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>>>>>> schema on the fly.
>>>>>>>>
>>>>>>>> I hope I can explain it :)
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Can you explain the use case some more? Are you wanting to change
>>>>>>>>> your SQL statement as well when the schema changes? If not, what are those
>>>>>>>>> new fields doing in the pipeline? What I mean is that your old SQL
>>>>>>>>> statement clearly didn't reference those fields in a SELECT statement since
>>>>>>>>> they didn't exist, so what are you missing by not having them unless you
>>>>>>>>> are also changing the SQL statement?
>>>>>>>>>
>>>>>>>>> Is this a case where you have a SELECT *, and just want to make
>>>>>>>>> sure those fields are included?
>>>>>>>>>
>>>>>>>>> Reuven
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Andrew,
>>>>>>>>>>
>>>>>>>>>> I assume SQL query is not going to change. Changing things is the
>>>>>>>>>> Row schema by adding new columns or rename columns. if we keep a version
>>>>>>>>>> information on somewhere for example a KV pair. Key is schema information,
>>>>>>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>>>>>>>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>>>>>>>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>>>>>>>>> it is not still doable ?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <
>>>>>>>>>> apilloud@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java code
>>>>>>>>>>> on the fly, even if we did, that wouldn't solve your problem. I believe our
>>>>>>>>>>> recommended practice is to run both the old and new pipeline for some time,
>>>>>>>>>>> then pick a window boundary to transition the output from the old pipeline
>>>>>>>>>>> to the new one.
>>>>>>>>>>>
>>>>>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>>>>>
>>>>>>>>>>> If you worked around this, the Beam model doesn't support
>>>>>>>>>>> changing the structure of the pipeline graph. This would significantly
>>>>>>>>>>> limit the changes you can make. It would also require some changes to SQL
>>>>>>>>>>> to try to produce the same plan for an updated SQL query.
>>>>>>>>>>>
>>>>>>>>>>> Andrew
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in
>>>>>>>>>>>> Avro format. We generate our rows based on our Avro schema. Over time the
>>>>>>>>>>>> schema is changing. I believe Beam SQL generates Java code based on what we
>>>>>>>>>>>> define as BeamSchema while submitting the pipeline. Do you have any idea
>>>>>>>>>>>> How can we handle schema changes with resubmitting our beam job. Is it
>>>>>>>>>>>> possible to generate SQL java code on the fly ?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Reuven Lax <re...@google.com>.
I'm not sure that we could support EXCEPT statements, as that would require
introspecting the unknown fields (what if the EXCEPT statement matches a
field that later is added as an unknown field?). IMO this sort of behavior
only makes sense on true pass-through queries. Anything that modifies the
input record would be tricky to support.

Nested rows would work for proposal 2. You would need to make sure that the
unknown-fields map is recursively added to all nested rows, and you would
do this when you infer a schema from the avro schema.

On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud <ap...@google.com> wrote:

> Proposal 1 would also interact poorly with SELECT * EXCEPT ... statements,
> which returns all columns except specific ones. Adding an unknown field
> does seem like a reasonable way to handle this. It probably needs to be
> something that is native to the Row type, so columns added to nested rows
> also work.
>
> Andrew
>
> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote:
>
>> There's a difference between a fully dynamic schema and simply being able
>> to forward "unknown" fields to the output.
>>
>> A fully-dynamic schema is not really necessary unless we also had dynamic
>> SQL statements. Since the existing SQL statements do not reference the new
>> fields by name, there's no reason to add them to the main schema.
>>
>> However, if you have a SELECT * FROM WHERE XXXX statement that does no
>> aggregation, there's fundamentally no reason we couldn't forward the
>> messages exactly. In theory we could forward the exact bytes that are in
>> the input PCollection, which would necessarily forward the new fields. In
>> practice I believe that we convert the input messages to Beam Row objects
>> in order to evaluate the WHERE clause, and then convert back to Avro to
>> output those messages. I believe this is where we "lose" the unknown
>> messages,but this is an implementation artifact - in theory we could output
>> the original bytes whenever we see a SELECT *. This is not truly a dynamic
>> schema, since you can't really do anything with these extra fields except
>> forward them to your output.
>>
>> I see two possible ways to address this.
>>
>> 1. As I mentioned above, in the case of a SELECT * we could output the
>> original bytes, and only use the Beam Row for evaluating the WHERE clause.
>> This might be very expensive though - we risk having to keep two copies of
>> every message around, one in the original Avro format and one in Row format.
>>
>> 2. The other way would be to do what protocol buffers do. We could add
>> one extra field to the inferred Beam schema to store new, unknown fields
>> (probably this would be a map-valued field). This extra field would simply
>> store the raw bytes of these unknown fields, and then when converting back
>> to Avro they would be added to the output message. This might also add some
>> overhead to the pipeline, so might be best to make this behavior opt in.
>>
>> Reuven
>>
>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bh...@google.com> wrote:
>>
>>> Reuven, could you clarify what you have in mind? I know multiple times
>>> we've discussed the possibility of adding update compatibility support to
>>> SchemaCoder, including support for certain schema changes (field
>>> additions/deletions) - I think the most recent discussion was here [1].
>>>
>>> But it sounds like Talat is asking for something a little beyond that,
>>> effectively a dynamic schema. Is that something you think we can support?
>>>
>>> [1]
>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>>
>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Thanks. It might be theoretically possible to do this (at least for the
>>>> case where existing fields do not change). Whether anyone currently has
>>>> available time to do this is a different question, but it's something that
>>>> can be looked into.
>>>>
>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>
>>>>> Adding new fields is more common than modifying existing fields. But
>>>>> type change is also possible for existing fields, such as regular mandatory
>>>>> field(string,integer) to union(nullable field). No field deletion.
>>>>>
>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> And when you say schema changes, are these new fields being added to
>>>>>> the schema? Or are you making changes to the existing fields?
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> For sure let me explain a little bit about my pipeline.
>>>>>>> My Pipeline is actually simple
>>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from
>>>>>>> Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>>>>>
>>>>>>> On our jobs We have three type sqls
>>>>>>> - SELECT * FROM PCOLLECTION
>>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>>>>> - SQL Projection with or without Where clause  SELECT col1, col2
>>>>>>> FROM PCOLLECTION
>>>>>>>
>>>>>>> We know writerSchema for each message. While deserializing avro
>>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes to Beam
>>>>>>> Row step. It always produces a reader schema's generic record and we
>>>>>>> convert that generic record to Row.
>>>>>>> While submitting DF job we use latest schema to generate beamSchema.
>>>>>>>
>>>>>>> In the current scenario When we have schema changes first we restart
>>>>>>> all 15k jobs with the latest updated schema then whenever we are done we
>>>>>>> turn on the latest schema for writers. Because of Avro's GrammerResolver[1]
>>>>>>> we read different versions of the schema and we always produce the latest
>>>>>>> schema's record. Without breaking our pipeline we are able to handle
>>>>>>> multiple versions of data in the same streaming pipeline. If we can
>>>>>>> generate SQL's java code when we get notified wirth latest schema we will
>>>>>>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>>>>>>> code. That's why I am looking for some solution. We dont need multiple
>>>>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>>>>> schema on the fly.
>>>>>>>
>>>>>>> I hope I can explain it :)
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> [1]
>>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Can you explain the use case some more? Are you wanting to change
>>>>>>>> your SQL statement as well when the schema changes? If not, what are those
>>>>>>>> new fields doing in the pipeline? What I mean is that your old SQL
>>>>>>>> statement clearly didn't reference those fields in a SELECT statement since
>>>>>>>> they didn't exist, so what are you missing by not having them unless you
>>>>>>>> are also changing the SQL statement?
>>>>>>>>
>>>>>>>> Is this a case where you have a SELECT *, and just want to make
>>>>>>>> sure those fields are included?
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Andrew,
>>>>>>>>>
>>>>>>>>> I assume SQL query is not going to change. Changing things is the
>>>>>>>>> Row schema by adding new columns or rename columns. if we keep a version
>>>>>>>>> information on somewhere for example a KV pair. Key is schema information,
>>>>>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>>>>>>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>>>>>>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>>>>>>>> it is not still doable ?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <ap...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java code
>>>>>>>>>> on the fly, even if we did, that wouldn't solve your problem. I believe our
>>>>>>>>>> recommended practice is to run both the old and new pipeline for some time,
>>>>>>>>>> then pick a window boundary to transition the output from the old pipeline
>>>>>>>>>> to the new one.
>>>>>>>>>>
>>>>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>>>>
>>>>>>>>>> If you worked around this, the Beam model doesn't support
>>>>>>>>>> changing the structure of the pipeline graph. This would significantly
>>>>>>>>>> limit the changes you can make. It would also require some changes to SQL
>>>>>>>>>> to try to produce the same plan for an updated SQL query.
>>>>>>>>>>
>>>>>>>>>> Andrew
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in
>>>>>>>>>>> Avro format. We generate our rows based on our Avro schema. Over time the
>>>>>>>>>>> schema is changing. I believe Beam SQL generates Java code based on what we
>>>>>>>>>>> define as BeamSchema while submitting the pipeline. Do you have any idea
>>>>>>>>>>> How can we handle schema changes with resubmitting our beam job. Is it
>>>>>>>>>>> possible to generate SQL java code on the fly ?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Andrew Pilloud <ap...@google.com>.
Proposal 1 would also interact poorly with SELECT * EXCEPT ... statements,
which returns all columns except specific ones. Adding an unknown field
does seem like a reasonable way to handle this. It probably needs to be
something that is native to the Row type, so columns added to nested rows
also work.

Andrew

On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote:

> There's a difference between a fully dynamic schema and simply being able
> to forward "unknown" fields to the output.
>
> A fully-dynamic schema is not really necessary unless we also had dynamic
> SQL statements. Since the existing SQL statements do not reference the new
> fields by name, there's no reason to add them to the main schema.
>
> However, if you have a SELECT * FROM WHERE XXXX statement that does no
> aggregation, there's fundamentally no reason we couldn't forward the
> messages exactly. In theory we could forward the exact bytes that are in
> the input PCollection, which would necessarily forward the new fields. In
> practice I believe that we convert the input messages to Beam Row objects
> in order to evaluate the WHERE clause, and then convert back to Avro to
> output those messages. I believe this is where we "lose" the unknown
> messages,but this is an implementation artifact - in theory we could output
> the original bytes whenever we see a SELECT *. This is not truly a dynamic
> schema, since you can't really do anything with these extra fields except
> forward them to your output.
>
> I see two possible ways to address this.
>
> 1. As I mentioned above, in the case of a SELECT * we could output the
> original bytes, and only use the Beam Row for evaluating the WHERE clause.
> This might be very expensive though - we risk having to keep two copies of
> every message around, one in the original Avro format and one in Row format.
>
> 2. The other way would be to do what protocol buffers do. We could add one
> extra field to the inferred Beam schema to store new, unknown fields
> (probably this would be a map-valued field). This extra field would simply
> store the raw bytes of these unknown fields, and then when converting back
> to Avro they would be added to the output message. This might also add some
> overhead to the pipeline, so might be best to make this behavior opt in.
>
> Reuven
>
> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bh...@google.com> wrote:
>
>> Reuven, could you clarify what you have in mind? I know multiple times
>> we've discussed the possibility of adding update compatibility support to
>> SchemaCoder, including support for certain schema changes (field
>> additions/deletions) - I think the most recent discussion was here [1].
>>
>> But it sounds like Talat is asking for something a little beyond that,
>> effectively a dynamic schema. Is that something you think we can support?
>>
>> [1]
>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>
>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Thanks. It might be theoretically possible to do this (at least for the
>>> case where existing fields do not change). Whether anyone currently has
>>> available time to do this is a different question, but it's something that
>>> can be looked into.
>>>
>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>> tuyarer@paloaltonetworks.com> wrote:
>>>
>>>> Adding new fields is more common than modifying existing fields. But
>>>> type change is also possible for existing fields, such as regular mandatory
>>>> field(string,integer) to union(nullable field). No field deletion.
>>>>
>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> And when you say schema changes, are these new fields being added to
>>>>> the schema? Or are you making changes to the existing fields?
>>>>>
>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>> For sure let me explain a little bit about my pipeline.
>>>>>> My Pipeline is actually simple
>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from
>>>>>> Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>>>>
>>>>>> On our jobs We have three type sqls
>>>>>> - SELECT * FROM PCOLLECTION
>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>>>> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>>>>>> PCOLLECTION
>>>>>>
>>>>>> We know writerSchema for each message. While deserializing avro
>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes to Beam
>>>>>> Row step. It always produces a reader schema's generic record and we
>>>>>> convert that generic record to Row.
>>>>>> While submitting DF job we use latest schema to generate beamSchema.
>>>>>>
>>>>>> In the current scenario When we have schema changes first we restart
>>>>>> all 15k jobs with the latest updated schema then whenever we are done we
>>>>>> turn on the latest schema for writers. Because of Avro's GrammerResolver[1]
>>>>>> we read different versions of the schema and we always produce the latest
>>>>>> schema's record. Without breaking our pipeline we are able to handle
>>>>>> multiple versions of data in the same streaming pipeline. If we can
>>>>>> generate SQL's java code when we get notified wirth latest schema we will
>>>>>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>>>>>> code. That's why I am looking for some solution. We dont need multiple
>>>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>>>> schema on the fly.
>>>>>>
>>>>>> I hope I can explain it :)
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> [1]
>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Can you explain the use case some more? Are you wanting to change
>>>>>>> your SQL statement as well when the schema changes? If not, what are those
>>>>>>> new fields doing in the pipeline? What I mean is that your old SQL
>>>>>>> statement clearly didn't reference those fields in a SELECT statement since
>>>>>>> they didn't exist, so what are you missing by not having them unless you
>>>>>>> are also changing the SQL statement?
>>>>>>>
>>>>>>> Is this a case where you have a SELECT *, and just want to make sure
>>>>>>> those fields are included?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>
>>>>>>>> Hi Andrew,
>>>>>>>>
>>>>>>>> I assume SQL query is not going to change. Changing things is the
>>>>>>>> Row schema by adding new columns or rename columns. if we keep a version
>>>>>>>> information on somewhere for example a KV pair. Key is schema information,
>>>>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>>>>>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>>>>>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>>>>>>> it is not still doable ?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <ap...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java code on
>>>>>>>>> the fly, even if we did, that wouldn't solve your problem. I believe our
>>>>>>>>> recommended practice is to run both the old and new pipeline for some time,
>>>>>>>>> then pick a window boundary to transition the output from the old pipeline
>>>>>>>>> to the new one.
>>>>>>>>>
>>>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>>>
>>>>>>>>> If you worked around this, the Beam model doesn't support changing
>>>>>>>>> the structure of the pipeline graph. This would significantly limit the
>>>>>>>>> changes you can make. It would also require some changes to SQL to try to
>>>>>>>>> produce the same plan for an updated SQL query.
>>>>>>>>>
>>>>>>>>> Andrew
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>>>>>>>> format. We generate our rows based on our Avro schema. Over time the schema
>>>>>>>>>> is changing. I believe Beam SQL generates Java code based on what we define
>>>>>>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>>>>>>>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>>>>>>>>> generate SQL java code on the fly ?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Reuven Lax <re...@google.com>.
There's a difference between a fully dynamic schema and simply being able
to forward "unknown" fields to the output.

A fully-dynamic schema is not really necessary unless we also had dynamic
SQL statements. Since the existing SQL statements do not reference the new
fields by name, there's no reason to add them to the main schema.

However, if you have a SELECT * FROM WHERE XXXX statement that does no
aggregation, there's fundamentally no reason we couldn't forward the
messages exactly. In theory we could forward the exact bytes that are in
the input PCollection, which would necessarily forward the new fields. In
practice I believe that we convert the input messages to Beam Row objects
in order to evaluate the WHERE clause, and then convert back to Avro to
output those messages. I believe this is where we "lose" the unknown
messages,but this is an implementation artifact - in theory we could output
the original bytes whenever we see a SELECT *. This is not truly a dynamic
schema, since you can't really do anything with these extra fields except
forward them to your output.

I see two possible ways to address this.

1. As I mentioned above, in the case of a SELECT * we could output the
original bytes, and only use the Beam Row for evaluating the WHERE clause.
This might be very expensive though - we risk having to keep two copies of
every message around, one in the original Avro format and one in Row format.

2. The other way would be to do what protocol buffers do. We could add one
extra field to the inferred Beam schema to store new, unknown fields
(probably this would be a map-valued field). This extra field would simply
store the raw bytes of these unknown fields, and then when converting back
to Avro they would be added to the output message. This might also add some
overhead to the pipeline, so might be best to make this behavior opt in.

Reuven

On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bh...@google.com> wrote:

> Reuven, could you clarify what you have in mind? I know multiple times
> we've discussed the possibility of adding update compatibility support to
> SchemaCoder, including support for certain schema changes (field
> additions/deletions) - I think the most recent discussion was here [1].
>
> But it sounds like Talat is asking for something a little beyond that,
> effectively a dynamic schema. Is that something you think we can support?
>
> [1]
> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>
> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:
>
>> Thanks. It might be theoretically possible to do this (at least for the
>> case where existing fields do not change). Whether anyone currently has
>> available time to do this is a different question, but it's something that
>> can be looked into.
>>
>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <tu...@paloaltonetworks.com>
>> wrote:
>>
>>> Adding new fields is more common than modifying existing fields. But
>>> type change is also possible for existing fields, such as regular mandatory
>>> field(string,integer) to union(nullable field). No field deletion.
>>>
>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> And when you say schema changes, are these new fields being added to
>>>> the schema? Or are you making changes to the existing fields?
>>>>
>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi,
>>>>> For sure let me explain a little bit about my pipeline.
>>>>> My Pipeline is actually simple
>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from
>>>>> Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>>>
>>>>> On our jobs We have three type sqls
>>>>> - SELECT * FROM PCOLLECTION
>>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>>> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>>>>> PCOLLECTION
>>>>>
>>>>> We know writerSchema for each message. While deserializing avro binary
>>>>> we use writer schema and reader schema on Convert Avro Bytes to Beam Row
>>>>> step. It always produces a reader schema's generic record and we convert
>>>>> that generic record to Row.
>>>>> While submitting DF job we use latest schema to generate beamSchema.
>>>>>
>>>>> In the current scenario When we have schema changes first we restart
>>>>> all 15k jobs with the latest updated schema then whenever we are done we
>>>>> turn on the latest schema for writers. Because of Avro's GrammerResolver[1]
>>>>> we read different versions of the schema and we always produce the latest
>>>>> schema's record. Without breaking our pipeline we are able to handle
>>>>> multiple versions of data in the same streaming pipeline. If we can
>>>>> generate SQL's java code when we get notified wirth latest schema we will
>>>>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>>>>> code. That's why I am looking for some solution. We dont need multiple
>>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>>> schema on the fly.
>>>>>
>>>>> I hope I can explain it :)
>>>>>
>>>>> Thanks
>>>>>
>>>>> [1]
>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>>
>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Can you explain the use case some more? Are you wanting to change
>>>>>> your SQL statement as well when the schema changes? If not, what are those
>>>>>> new fields doing in the pipeline? What I mean is that your old SQL
>>>>>> statement clearly didn't reference those fields in a SELECT statement since
>>>>>> they didn't exist, so what are you missing by not having them unless you
>>>>>> are also changing the SQL statement?
>>>>>>
>>>>>> Is this a case where you have a SELECT *, and just want to make sure
>>>>>> those fields are included?
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>
>>>>>>> Hi Andrew,
>>>>>>>
>>>>>>> I assume SQL query is not going to change. Changing things is the
>>>>>>> Row schema by adding new columns or rename columns. if we keep a version
>>>>>>> information on somewhere for example a KV pair. Key is schema information,
>>>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>>>>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>>>>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>>>>>> it is not still doable ?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <ap...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Unfortunately we don't have a way to generate the SQL Java code on
>>>>>>>> the fly, even if we did, that wouldn't solve your problem. I believe our
>>>>>>>> recommended practice is to run both the old and new pipeline for some time,
>>>>>>>> then pick a window boundary to transition the output from the old pipeline
>>>>>>>> to the new one.
>>>>>>>>
>>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>>
>>>>>>>> If you worked around this, the Beam model doesn't support changing
>>>>>>>> the structure of the pipeline graph. This would significantly limit the
>>>>>>>> changes you can make. It would also require some changes to SQL to try to
>>>>>>>> produce the same plan for an updated SQL query.
>>>>>>>>
>>>>>>>> Andrew
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>>>>>>> format. We generate our rows based on our Avro schema. Over time the schema
>>>>>>>>> is changing. I believe Beam SQL generates Java code based on what we define
>>>>>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>>>>>>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>>>>>>>> generate SQL java code on the fly ?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Brian Hulette <bh...@google.com>.
Reuven, could you clarify what you have in mind? I know multiple times
we've discussed the possibility of adding update compatibility support to
SchemaCoder, including support for certain schema changes (field
additions/deletions) - I think the most recent discussion was here [1].

But it sounds like Talat is asking for something a little beyond that,
effectively a dynamic schema. Is that something you think we can support?

[1]
https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E

On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:

> Thanks. It might be theoretically possible to do this (at least for the
> case where existing fields do not change). Whether anyone currently has
> available time to do this is a different question, but it's something that
> can be looked into.
>
> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <tu...@paloaltonetworks.com>
> wrote:
>
>> Adding new fields is more common than modifying existing fields. But type
>> change is also possible for existing fields, such as regular mandatory
>> field(string,integer) to union(nullable field). No field deletion.
>>
>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>>
>>> And when you say schema changes, are these new fields being added to the
>>> schema? Or are you making changes to the existing fields?
>>>
>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>> tuyarer@paloaltonetworks.com> wrote:
>>>
>>>> Hi,
>>>> For sure let me explain a little bit about my pipeline.
>>>> My Pipeline is actually simple
>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from
>>>> Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>>
>>>> On our jobs We have three type sqls
>>>> - SELECT * FROM PCOLLECTION
>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>>>> PCOLLECTION
>>>>
>>>> We know writerSchema for each message. While deserializing avro binary
>>>> we use writer schema and reader schema on Convert Avro Bytes to Beam Row
>>>> step. It always produces a reader schema's generic record and we convert
>>>> that generic record to Row.
>>>> While submitting DF job we use latest schema to generate beamSchema.
>>>>
>>>> In the current scenario When we have schema changes first we restart
>>>> all 15k jobs with the latest updated schema then whenever we are done we
>>>> turn on the latest schema for writers. Because of Avro's GrammerResolver[1]
>>>> we read different versions of the schema and we always produce the latest
>>>> schema's record. Without breaking our pipeline we are able to handle
>>>> multiple versions of data in the same streaming pipeline. If we can
>>>> generate SQL's java code when we get notified wirth latest schema we will
>>>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>>>> code. That's why I am looking for some solution. We dont need multiple
>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>> schema on the fly.
>>>>
>>>> I hope I can explain it :)
>>>>
>>>> Thanks
>>>>
>>>> [1]
>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>
>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Can you explain the use case some more? Are you wanting to change your
>>>>> SQL statement as well when the schema changes? If not, what are those new
>>>>> fields doing in the pipeline? What I mean is that your old SQL statement
>>>>> clearly didn't reference those fields in a SELECT statement since they
>>>>> didn't exist, so what are you missing by not having them unless you are
>>>>> also changing the SQL statement?
>>>>>
>>>>> Is this a case where you have a SELECT *, and just want to make sure
>>>>> those fields are included?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi Andrew,
>>>>>>
>>>>>> I assume SQL query is not going to change. Changing things is the Row
>>>>>> schema by adding new columns or rename columns. if we keep a version
>>>>>> information on somewhere for example a KV pair. Key is schema information,
>>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>>>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>>>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>>>>> it is not still doable ?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <ap...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Unfortunately we don't have a way to generate the SQL Java code on
>>>>>>> the fly, even if we did, that wouldn't solve your problem. I believe our
>>>>>>> recommended practice is to run both the old and new pipeline for some time,
>>>>>>> then pick a window boundary to transition the output from the old pipeline
>>>>>>> to the new one.
>>>>>>>
>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>
>>>>>>> If you worked around this, the Beam model doesn't support changing
>>>>>>> the structure of the pipeline graph. This would significantly limit the
>>>>>>> changes you can make. It would also require some changes to SQL to try to
>>>>>>> produce the same plan for an updated SQL query.
>>>>>>>
>>>>>>> Andrew
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>>>>>> format. We generate our rows based on our Avro schema. Over time the schema
>>>>>>>> is changing. I believe Beam SQL generates Java code based on what we define
>>>>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>>>>>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>>>>>>> generate SQL java code on the fly ?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Thanks Reuven,

I can work on that. I know the internals of BeamSQL. I could not figure out
How to replace Step's code with new generated code after the pipeline is
submitted. Could you share your thoughts on this?

Thanks

On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:

> Thanks. It might be theoretically possible to do this (at least for the
> case where existing fields do not change). Whether anyone currently has
> available time to do this is a different question, but it's something that
> can be looked into.
>
> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <tu...@paloaltonetworks.com>
> wrote:
>
>> Adding new fields is more common than modifying existing fields. But type
>> change is also possible for existing fields, such as regular mandatory
>> field(string,integer) to union(nullable field). No field deletion.
>>
>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>>
>>> And when you say schema changes, are these new fields being added to the
>>> schema? Or are you making changes to the existing fields?
>>>
>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>> tuyarer@paloaltonetworks.com> wrote:
>>>
>>>> Hi,
>>>> For sure let me explain a little bit about my pipeline.
>>>> My Pipeline is actually simple
>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from
>>>> Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>>
>>>> On our jobs We have three type sqls
>>>> - SELECT * FROM PCOLLECTION
>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>>>> PCOLLECTION
>>>>
>>>> We know writerSchema for each message. While deserializing avro binary
>>>> we use writer schema and reader schema on Convert Avro Bytes to Beam Row
>>>> step. It always produces a reader schema's generic record and we convert
>>>> that generic record to Row.
>>>> While submitting DF job we use latest schema to generate beamSchema.
>>>>
>>>> In the current scenario When we have schema changes first we restart
>>>> all 15k jobs with the latest updated schema then whenever we are done we
>>>> turn on the latest schema for writers. Because of Avro's GrammerResolver[1]
>>>> we read different versions of the schema and we always produce the latest
>>>> schema's record. Without breaking our pipeline we are able to handle
>>>> multiple versions of data in the same streaming pipeline. If we can
>>>> generate SQL's java code when we get notified wirth latest schema we will
>>>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>>>> code. That's why I am looking for some solution. We dont need multiple
>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>> schema on the fly.
>>>>
>>>> I hope I can explain it :)
>>>>
>>>> Thanks
>>>>
>>>> [1]
>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>
>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Can you explain the use case some more? Are you wanting to change your
>>>>> SQL statement as well when the schema changes? If not, what are those new
>>>>> fields doing in the pipeline? What I mean is that your old SQL statement
>>>>> clearly didn't reference those fields in a SELECT statement since they
>>>>> didn't exist, so what are you missing by not having them unless you are
>>>>> also changing the SQL statement?
>>>>>
>>>>> Is this a case where you have a SELECT *, and just want to make sure
>>>>> those fields are included?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi Andrew,
>>>>>>
>>>>>> I assume SQL query is not going to change. Changing things is the Row
>>>>>> schema by adding new columns or rename columns. if we keep a version
>>>>>> information on somewhere for example a KV pair. Key is schema information,
>>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>>>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>>>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>>>>> it is not still doable ?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <ap...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Unfortunately we don't have a way to generate the SQL Java code on
>>>>>>> the fly, even if we did, that wouldn't solve your problem. I believe our
>>>>>>> recommended practice is to run both the old and new pipeline for some time,
>>>>>>> then pick a window boundary to transition the output from the old pipeline
>>>>>>> to the new one.
>>>>>>>
>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>
>>>>>>> If you worked around this, the Beam model doesn't support changing
>>>>>>> the structure of the pipeline graph. This would significantly limit the
>>>>>>> changes you can make. It would also require some changes to SQL to try to
>>>>>>> produce the same plan for an updated SQL query.
>>>>>>>
>>>>>>> Andrew
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>>>>>> format. We generate our rows based on our Avro schema. Over time the schema
>>>>>>>> is changing. I believe Beam SQL generates Java code based on what we define
>>>>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>>>>>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>>>>>>> generate SQL java code on the fly ?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Reuven Lax <re...@google.com>.
Thanks. It might be theoretically possible to do this (at least for the
case where existing fields do not change). Whether anyone currently has
available time to do this is a different question, but it's something that
can be looked into.

On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <tu...@paloaltonetworks.com>
wrote:

> Adding new fields is more common than modifying existing fields. But type
> change is also possible for existing fields, such as regular mandatory
> field(string,integer) to union(nullable field). No field deletion.
>
> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>
>> And when you say schema changes, are these new fields being added to the
>> schema? Or are you making changes to the existing fields?
>>
>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <tu...@paloaltonetworks.com>
>> wrote:
>>
>>> Hi,
>>> For sure let me explain a little bit about my pipeline.
>>> My Pipeline is actually simple
>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from Row
>>> to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>
>>> On our jobs We have three type sqls
>>> - SELECT * FROM PCOLLECTION
>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>>> PCOLLECTION
>>>
>>> We know writerSchema for each message. While deserializing avro binary
>>> we use writer schema and reader schema on Convert Avro Bytes to Beam Row
>>> step. It always produces a reader schema's generic record and we convert
>>> that generic record to Row.
>>> While submitting DF job we use latest schema to generate beamSchema.
>>>
>>> In the current scenario When we have schema changes first we restart all
>>> 15k jobs with the latest updated schema then whenever we are done we turn
>>> on the latest schema for writers. Because of Avro's GrammerResolver[1] we
>>> read different versions of the schema and we always produce the latest
>>> schema's record. Without breaking our pipeline we are able to handle
>>> multiple versions of data in the same streaming pipeline. If we can
>>> generate SQL's java code when we get notified wirth latest schema we will
>>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>>> code. That's why I am looking for some solution. We dont need multiple
>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>> schema on the fly.
>>>
>>> I hope I can explain it :)
>>>
>>> Thanks
>>>
>>> [1]
>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>
>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Can you explain the use case some more? Are you wanting to change your
>>>> SQL statement as well when the schema changes? If not, what are those new
>>>> fields doing in the pipeline? What I mean is that your old SQL statement
>>>> clearly didn't reference those fields in a SELECT statement since they
>>>> didn't exist, so what are you missing by not having them unless you are
>>>> also changing the SQL statement?
>>>>
>>>> Is this a case where you have a SELECT *, and just want to make sure
>>>> those fields are included?
>>>>
>>>> Reuven
>>>>
>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi Andrew,
>>>>>
>>>>> I assume SQL query is not going to change. Changing things is the Row
>>>>> schema by adding new columns or rename columns. if we keep a version
>>>>> information on somewhere for example a KV pair. Key is schema information,
>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>>>> it is not still doable ?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <ap...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Unfortunately we don't have a way to generate the SQL Java code on
>>>>>> the fly, even if we did, that wouldn't solve your problem. I believe our
>>>>>> recommended practice is to run both the old and new pipeline for some time,
>>>>>> then pick a window boundary to transition the output from the old pipeline
>>>>>> to the new one.
>>>>>>
>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>
>>>>>> If you worked around this, the Beam model doesn't support changing
>>>>>> the structure of the pipeline graph. This would significantly limit the
>>>>>> changes you can make. It would also require some changes to SQL to try to
>>>>>> produce the same plan for an updated SQL query.
>>>>>>
>>>>>> Andrew
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>>>>> format. We generate our rows based on our Avro schema. Over time the schema
>>>>>>> is changing. I believe Beam SQL generates Java code based on what we define
>>>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>>>>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>>>>>> generate SQL java code on the fly ?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Adding new fields is more common than modifying existing fields. But type
change is also possible for existing fields, such as regular mandatory
field(string,integer) to union(nullable field). No field deletion.

On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:

> And when you say schema changes, are these new fields being added to the
> schema? Or are you making changes to the existing fields?
>
> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <tu...@paloaltonetworks.com>
> wrote:
>
>> Hi,
>> For sure let me explain a little bit about my pipeline.
>> My Pipeline is actually simple
>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from Row
>> to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>
>> On our jobs We have three type sqls
>> - SELECT * FROM PCOLLECTION
>> - SELECT * FROM PCOLLECTION <with Where Condition>
>> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>> PCOLLECTION
>>
>> We know writerSchema for each message. While deserializing avro binary we
>> use writer schema and reader schema on Convert Avro Bytes to Beam Row step.
>> It always produces a reader schema's generic record and we convert that
>> generic record to Row.
>> While submitting DF job we use latest schema to generate beamSchema.
>>
>> In the current scenario When we have schema changes first we restart all
>> 15k jobs with the latest updated schema then whenever we are done we turn
>> on the latest schema for writers. Because of Avro's GrammerResolver[1] we
>> read different versions of the schema and we always produce the latest
>> schema's record. Without breaking our pipeline we are able to handle
>> multiple versions of data in the same streaming pipeline. If we can
>> generate SQL's java code when we get notified wirth latest schema we will
>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>> code. That's why I am looking for some solution. We dont need multiple
>> versions of SQL. We only need to regenerate SQL schema with the latest
>> schema on the fly.
>>
>> I hope I can explain it :)
>>
>> Thanks
>>
>> [1]
>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>
>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Can you explain the use case some more? Are you wanting to change your
>>> SQL statement as well when the schema changes? If not, what are those new
>>> fields doing in the pipeline? What I mean is that your old SQL statement
>>> clearly didn't reference those fields in a SELECT statement since they
>>> didn't exist, so what are you missing by not having them unless you are
>>> also changing the SQL statement?
>>>
>>> Is this a case where you have a SELECT *, and just want to make sure
>>> those fields are included?
>>>
>>> Reuven
>>>
>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>> tuyarer@paloaltonetworks.com> wrote:
>>>
>>>> Hi Andrew,
>>>>
>>>> I assume SQL query is not going to change. Changing things is the Row
>>>> schema by adding new columns or rename columns. if we keep a version
>>>> information on somewhere for example a KV pair. Key is schema information,
>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>>> it is not still doable ?
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <ap...@google.com>
>>>> wrote:
>>>>
>>>>> Unfortunately we don't have a way to generate the SQL Java code on the
>>>>> fly, even if we did, that wouldn't solve your problem. I believe our
>>>>> recommended practice is to run both the old and new pipeline for some time,
>>>>> then pick a window boundary to transition the output from the old pipeline
>>>>> to the new one.
>>>>>
>>>>> Beam doesn't handle changing the format of data sent between
>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>
>>>>> If you worked around this, the Beam model doesn't support changing the
>>>>> structure of the pipeline graph. This would significantly limit the changes
>>>>> you can make. It would also require some changes to SQL to try to produce
>>>>> the same plan for an updated SQL query.
>>>>>
>>>>> Andrew
>>>>>
>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>>>> format. We generate our rows based on our Avro schema. Over time the schema
>>>>>> is changing. I believe Beam SQL generates Java code based on what we define
>>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>>>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>>>>> generate SQL java code on the fly ?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Reuven Lax <re...@google.com>.
And when you say schema changes, are these new fields being added to the
schema? Or are you making changes to the existing fields?

On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <tu...@paloaltonetworks.com>
wrote:

> Hi,
> For sure let me explain a little bit about my pipeline.
> My Pipeline is actually simple
> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from Row
> to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>
> On our jobs We have three type sqls
> - SELECT * FROM PCOLLECTION
> - SELECT * FROM PCOLLECTION <with Where Condition>
> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
> PCOLLECTION
>
> We know writerSchema for each message. While deserializing avro binary we
> use writer schema and reader schema on Convert Avro Bytes to Beam Row step.
> It always produces a reader schema's generic record and we convert that
> generic record to Row.
> While submitting DF job we use latest schema to generate beamSchema.
>
> In the current scenario When we have schema changes first we restart all
> 15k jobs with the latest updated schema then whenever we are done we turn
> on the latest schema for writers. Because of Avro's GrammerResolver[1] we
> read different versions of the schema and we always produce the latest
> schema's record. Without breaking our pipeline we are able to handle
> multiple versions of data in the same streaming pipeline. If we can
> generate SQL's java code when we get notified wirth latest schema we will
> handle all schema changes. The only remaining obstacle is Beam's SQL Java
> code. That's why I am looking for some solution. We dont need multiple
> versions of SQL. We only need to regenerate SQL schema with the latest
> schema on the fly.
>
> I hope I can explain it :)
>
> Thanks
>
> [1]
> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>
> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>
>> Can you explain the use case some more? Are you wanting to change your
>> SQL statement as well when the schema changes? If not, what are those new
>> fields doing in the pipeline? What I mean is that your old SQL statement
>> clearly didn't reference those fields in a SELECT statement since they
>> didn't exist, so what are you missing by not having them unless you are
>> also changing the SQL statement?
>>
>> Is this a case where you have a SELECT *, and just want to make sure
>> those fields are included?
>>
>> Reuven
>>
>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <tu...@paloaltonetworks.com>
>> wrote:
>>
>>> Hi Andrew,
>>>
>>> I assume SQL query is not going to change. Changing things is the Row
>>> schema by adding new columns or rename columns. if we keep a version
>>> information on somewhere for example a KV pair. Key is schema information,
>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>> it is not still doable ?
>>>
>>> Thanks
>>>
>>>
>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <ap...@google.com>
>>> wrote:
>>>
>>>> Unfortunately we don't have a way to generate the SQL Java code on the
>>>> fly, even if we did, that wouldn't solve your problem. I believe our
>>>> recommended practice is to run both the old and new pipeline for some time,
>>>> then pick a window boundary to transition the output from the old pipeline
>>>> to the new one.
>>>>
>>>> Beam doesn't handle changing the format of data sent between
>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>> data between steps of the pipeline. The builtin coders (including the
>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>> schema evolution. They are optimized for performance at all costs.
>>>>
>>>> If you worked around this, the Beam model doesn't support changing the
>>>> structure of the pipeline graph. This would significantly limit the changes
>>>> you can make. It would also require some changes to SQL to try to produce
>>>> the same plan for an updated SQL query.
>>>>
>>>> Andrew
>>>>
>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>> tuyarer@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>>> format. We generate our rows based on our Avro schema. Over time the schema
>>>>> is changing. I believe Beam SQL generates Java code based on what we define
>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>>>> generate SQL java code on the fly ?
>>>>>
>>>>> Thanks
>>>>>
>>>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Hi,
For sure let me explain a little bit about my pipeline.
My Pipeline is actually simple
Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>, Row>) ->
Apply Filter(SqlTransform.query(sql)) -> Convert back from Row to Avro
(DoFn<Row,
byte[]>)-> Write DB/GCS/GRPC etc

On our jobs We have three type sqls
- SELECT * FROM PCOLLECTION
- SELECT * FROM PCOLLECTION <with Where Condition>
- SQL Projection with or without Where clause  SELECT col1, col2 FROM
PCOLLECTION

We know writerSchema for each message. While deserializing avro binary we
use writer schema and reader schema on Convert Avro Bytes to Beam Row step.
It always produces a reader schema's generic record and we convert that
generic record to Row.
While submitting DF job we use latest schema to generate beamSchema.

In the current scenario When we have schema changes first we restart all
15k jobs with the latest updated schema then whenever we are done we turn
on the latest schema for writers. Because of Avro's GrammerResolver[1] we
read different versions of the schema and we always produce the latest
schema's record. Without breaking our pipeline we are able to handle
multiple versions of data in the same streaming pipeline. If we can
generate SQL's java code when we get notified wirth latest schema we will
handle all schema changes. The only remaining obstacle is Beam's SQL Java
code. That's why I am looking for some solution. We dont need multiple
versions of SQL. We only need to regenerate SQL schema with the latest
schema on the fly.

I hope I can explain it :)

Thanks

[1]
https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html

On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:

> Can you explain the use case some more? Are you wanting to change your SQL
> statement as well when the schema changes? If not, what are those new
> fields doing in the pipeline? What I mean is that your old SQL statement
> clearly didn't reference those fields in a SELECT statement since they
> didn't exist, so what are you missing by not having them unless you are
> also changing the SQL statement?
>
> Is this a case where you have a SELECT *, and just want to make sure those
> fields are included?
>
> Reuven
>
> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <tu...@paloaltonetworks.com>
> wrote:
>
>> Hi Andrew,
>>
>> I assume SQL query is not going to change. Changing things is the Row
>> schema by adding new columns or rename columns. if we keep a version
>> information on somewhere for example a KV pair. Key is schema information,
>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>> pipelines. When we have a schema change we restart a 15k DF job which is
>> pain. I am looking for a possible way to avoid job restart. Dont you think
>> it is not still doable ?
>>
>> Thanks
>>
>>
>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <ap...@google.com>
>> wrote:
>>
>>> Unfortunately we don't have a way to generate the SQL Java code on the
>>> fly, even if we did, that wouldn't solve your problem. I believe our
>>> recommended practice is to run both the old and new pipeline for some time,
>>> then pick a window boundary to transition the output from the old pipeline
>>> to the new one.
>>>
>>> Beam doesn't handle changing the format of data sent between
>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>> data between steps of the pipeline. The builtin coders (including the
>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>> schema evolution. They are optimized for performance at all costs.
>>>
>>> If you worked around this, the Beam model doesn't support changing the
>>> structure of the pipeline graph. This would significantly limit the changes
>>> you can make. It would also require some changes to SQL to try to produce
>>> the same plan for an updated SQL query.
>>>
>>> Andrew
>>>
>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>> tuyarer@paloaltonetworks.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>> format. We generate our rows based on our Avro schema. Over time the schema
>>>> is changing. I believe Beam SQL generates Java code based on what we define
>>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>>> generate SQL java code on the fly ?
>>>>
>>>> Thanks
>>>>
>>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Reuven Lax <re...@google.com>.
Can you explain the use case some more? Are you wanting to change your SQL
statement as well when the schema changes? If not, what are those new
fields doing in the pipeline? What I mean is that your old SQL statement
clearly didn't reference those fields in a SELECT statement since they
didn't exist, so what are you missing by not having them unless you are
also changing the SQL statement?

Is this a case where you have a SELECT *, and just want to make sure those
fields are included?

Reuven

On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <tu...@paloaltonetworks.com>
wrote:

> Hi Andrew,
>
> I assume SQL query is not going to change. Changing things is the Row
> schema by adding new columns or rename columns. if we keep a version
> information on somewhere for example a KV pair. Key is schema information,
> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
> pipelines. When we have a schema change we restart a 15k DF job which is
> pain. I am looking for a possible way to avoid job restart. Dont you think
> it is not still doable ?
>
> Thanks
>
>
> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <ap...@google.com> wrote:
>
>> Unfortunately we don't have a way to generate the SQL Java code on the
>> fly, even if we did, that wouldn't solve your problem. I believe our
>> recommended practice is to run both the old and new pipeline for some time,
>> then pick a window boundary to transition the output from the old pipeline
>> to the new one.
>>
>> Beam doesn't handle changing the format of data sent between intermediate
>> steps in a running pipeline. Beam uses "coders" to serialize data between
>> steps of the pipeline. The builtin coders (including the Schema Row Coder
>> used by SQL) have a fixed data format and don't handle schema evolution.
>> They are optimized for performance at all costs.
>>
>> If you worked around this, the Beam model doesn't support changing the
>> structure of the pipeline graph. This would significantly limit the changes
>> you can make. It would also require some changes to SQL to try to produce
>> the same plan for an updated SQL query.
>>
>> Andrew
>>
>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <tu...@paloaltonetworks.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>> format. We generate our rows based on our Avro schema. Over time the schema
>>> is changing. I believe Beam SQL generates Java code based on what we define
>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>> generate SQL java code on the fly ?
>>>
>>> Thanks
>>>
>>

Re: About Beam SQL Schema Changes and Code generation

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Hi Andrew,

I assume SQL query is not going to change. Changing things is the Row
schema by adding new columns or rename columns. if we keep a version
information on somewhere for example a KV pair. Key is schema information,
value is Row. Can not we generate SQL code ? Why I am asking We have 15k
pipelines. When we have a schema change we restart a 15k DF job which is
pain. I am looking for a possible way to avoid job restart. Dont you think
it is not still doable ?

Thanks


On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <ap...@google.com> wrote:

> Unfortunately we don't have a way to generate the SQL Java code on the
> fly, even if we did, that wouldn't solve your problem. I believe our
> recommended practice is to run both the old and new pipeline for some time,
> then pick a window boundary to transition the output from the old pipeline
> to the new one.
>
> Beam doesn't handle changing the format of data sent between intermediate
> steps in a running pipeline. Beam uses "coders" to serialize data between
> steps of the pipeline. The builtin coders (including the Schema Row Coder
> used by SQL) have a fixed data format and don't handle schema evolution.
> They are optimized for performance at all costs.
>
> If you worked around this, the Beam model doesn't support changing the
> structure of the pipeline graph. This would significantly limit the changes
> you can make. It would also require some changes to SQL to try to produce
> the same plan for an updated SQL query.
>
> Andrew
>
> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <tu...@paloaltonetworks.com>
> wrote:
>
>> Hi,
>>
>> We are using Beamsql on our pipeline. Our Data is written in Avro format.
>> We generate our rows based on our Avro schema. Over time the schema is
>> changing. I believe Beam SQL generates Java code based on what we define as
>> BeamSchema while submitting the pipeline. Do you have any idea How can we
>> handle schema changes with resubmitting our beam job. Is it possible to
>> generate SQL java code on the fly ?
>>
>> Thanks
>>
>

Re: About Beam SQL Schema Changes and Code generation

Posted by Andrew Pilloud <ap...@google.com>.
Unfortunately we don't have a way to generate the SQL Java code on the fly,
even if we did, that wouldn't solve your problem. I believe our recommended
practice is to run both the old and new pipeline for some time, then pick a
window boundary to transition the output from the old pipeline to the new
one.

Beam doesn't handle changing the format of data sent between intermediate
steps in a running pipeline. Beam uses "coders" to serialize data between
steps of the pipeline. The builtin coders (including the Schema Row Coder
used by SQL) have a fixed data format and don't handle schema evolution.
They are optimized for performance at all costs.

If you worked around this, the Beam model doesn't support changing the
structure of the pipeline graph. This would significantly limit the changes
you can make. It would also require some changes to SQL to try to produce
the same plan for an updated SQL query.

Andrew

On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <tu...@paloaltonetworks.com>
wrote:

> Hi,
>
> We are using Beamsql on our pipeline. Our Data is written in Avro format.
> We generate our rows based on our Avro schema. Over time the schema is
> changing. I believe Beam SQL generates Java code based on what we define as
> BeamSchema while submitting the pipeline. Do you have any idea How can we
> handle schema changes with resubmitting our beam job. Is it possible to
> generate SQL java code on the fly ?
>
> Thanks
>