You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Tao Li <ta...@zillow.com> on 2021/01/08 18:10:35 UTC

Potential bug with Select.Flattened?

Hi Beam community,

According to the beam programming guide<https://beam.apache.org/documentation/programming-guide/#what-is-a-schema>, Select.Flattened transform is able to flatten the nested fields of an array. Please see transactions_bank and transactions_purchaseAmount below, which are both ARRAY type, meaning the nested fields of transactions fields have been flattened to the top level as arrays.

[Graphical user interface, text, application  Description automatically generated]

However according to my test with direct runner, these 2 nested fields (bank and purchaseAmount) were not able to be flattened with Select.Flattened. Please see the original schema and flattened schema below. Is it a typo in the programming guide or is it a bug with this API? Thanks!

Original schema:

Field{name=userId, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress, description=, type=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=streetAddress, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=city, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, options={{}}}
Field{name=transactions, description=, type=FieldType{typeName=ARRAY, nullable=false, logicalType=null, collectionElementType=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=bank, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}

Flattened schema:

Field{name=userId, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=transactions, description=, type=FieldType{typeName=ARRAY, nullable=false, logicalType=null, collectionElementType=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=bank, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress_streetAddress, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress_city, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}


Here is the code to create the input data:

Schema transactionSchema = Schema.builder()
        .addStringField("bank")
        .addDoubleField("purchaseAmount")
        .build();
Row transactionOne = Row
        .withSchema(transactionSchema)
        .addValues("foo", 1.0)
        .build();
Row transactionTwo = Row
        .withSchema(transactionSchema)
        .addValues("bar", 2.0)
        .build();
Schema shippingAddressSchema = Schema.builder()
        .addStringField("streetAddress")
        .addStringField("city")
        .build();
Row address = Row
        .withSchema(shippingAddressSchema)
        .addValues("street", "city")
        .build();
Schema schema = Schema.builder()
        .addStringField("userId")
        .addRowField("shippingAddress", shippingAddressSchema)
        .addArrayField("transactions", Schema.FieldType.row(transactionSchema))
        .build();
Row row = Row
        .withSchema(schema)
        .addValues("user", address)
        .addArray(transactionOne, transactionTwo)
        .build();
PCollection<Row> input = executionContext
        .getPipeline()
        .apply(Create.of(row))
        .setRowSchema(schema);

Here is the transform code:
input.apply(Select.flattenedSchema())

Re: Potential bug with Select.Flattened?

Posted by Tao Li <ta...@zillow.com>.
@Brian Hulette<ma...@google.com> thanks for the quick response! It’s about the “transactions” field, which is an array of rows. The row definition is a “bank” field plus a “purchaseAmount” field.


From: Brian Hulette <bh...@google.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Friday, January 8, 2021 at 10:29 AM
To: user <us...@beam.apache.org>, dev <de...@beam.apache.org>
Subject: Re: Potential bug with Select.Flattened?

+dev<ma...@beam.apache.org>

Hey Tao,
Thanks for reporting this, it does look like a bug. Your description "flatten the nested fields of an array" threw me off a little bit, it looks like what's failing is flattening a struct that contains array fields. Does that sound right to you?

I filed BEAM-11585 [1] to track this.

Brian

[1] https://issues.apache.org/jira/browse/BEAM-11585<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11585&data=04%7C01%7Ctaol%40zillow.com%7C55162d586c1941d7c08608d8b403448b%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457273415389798%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=P1%2FPNs6LIuEzgT8nrbgqpnRzfjenVmTNUbiO5vMC8Cs%3D&reserved=0>

On Fri, Jan 8, 2021 at 10:10 AM Tao Li <ta...@zillow.com>> wrote:
Hi Beam community,

According to the beam programming guide<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fprogramming-guide%2F%23what-is-a-schema&data=04%7C01%7Ctaol%40zillow.com%7C55162d586c1941d7c08608d8b403448b%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457273415399740%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=scwJtPympUKcz%2FEnCgeA%2FO4eT2sbH435y1E5Se0OWxg%3D&reserved=0>, Select.Flattened transform is able to flatten the nested fields of an array. Please see transactions_bank and transactions_purchaseAmount below, which are both ARRAY type, meaning the nested fields of transactions fields have been flattened to the top level as arrays.

[Graphical user interface, text, application  Description automatically generated]

However according to my test with direct runner, these 2 nested fields (bank and purchaseAmount) were not able to be flattened with Select.Flattened. Please see the original schema and flattened schema below. Is it a typo in the programming guide or is it a bug with this API? Thanks!

Original schema:

Field{name=userId, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress, description=, type=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=streetAddress, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=city, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, options={{}}}
Field{name=transactions, description=, type=FieldType{typeName=ARRAY, nullable=false, logicalType=null, collectionElementType=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=bank, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}

Flattened schema:

Field{name=userId, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=transactions, description=, type=FieldType{typeName=ARRAY, nullable=false, logicalType=null, collectionElementType=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=bank, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress_streetAddress, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress_city, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}


Here is the code to create the input data:

Schema transactionSchema = Schema.builder()
        .addStringField("bank")
        .addDoubleField("purchaseAmount")
        .build();
Row transactionOne = Row
        .withSchema(transactionSchema)
        .addValues("foo", 1.0)
        .build();
Row transactionTwo = Row
        .withSchema(transactionSchema)
        .addValues("bar", 2.0)
        .build();
Schema shippingAddressSchema = Schema.builder()
        .addStringField("streetAddress")
        .addStringField("city")
        .build();
Row address = Row
        .withSchema(shippingAddressSchema)
        .addValues("street", "city")
        .build();
Schema schema = Schema.builder()
        .addStringField("userId")
        .addRowField("shippingAddress", shippingAddressSchema)
        .addArrayField("transactions", Schema.FieldType.row(transactionSchema))
        .build();
Row row = Row
        .withSchema(schema)
        .addValues("user", address)
        .addArray(transactionOne, transactionTwo)
        .build();
PCollection<Row> input = executionContext
        .getPipeline()
        .apply(Create.of(row))
        .setRowSchema(schema);

Here is the transform code:
input.apply(Select.flattenedSchema())

Re: Potential bug with Select.Flattened?

Posted by Tao Li <ta...@zillow.com>.
@Brian Hulette<ma...@google.com> thanks for the quick response! It’s about the “transactions” field, which is an array of rows. The row definition is a “bank” field plus a “purchaseAmount” field.


From: Brian Hulette <bh...@google.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Friday, January 8, 2021 at 10:29 AM
To: user <us...@beam.apache.org>, dev <de...@beam.apache.org>
Subject: Re: Potential bug with Select.Flattened?

+dev<ma...@beam.apache.org>

Hey Tao,
Thanks for reporting this, it does look like a bug. Your description "flatten the nested fields of an array" threw me off a little bit, it looks like what's failing is flattening a struct that contains array fields. Does that sound right to you?

I filed BEAM-11585 [1] to track this.

Brian

[1] https://issues.apache.org/jira/browse/BEAM-11585<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11585&data=04%7C01%7Ctaol%40zillow.com%7C55162d586c1941d7c08608d8b403448b%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457273415389798%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=P1%2FPNs6LIuEzgT8nrbgqpnRzfjenVmTNUbiO5vMC8Cs%3D&reserved=0>

On Fri, Jan 8, 2021 at 10:10 AM Tao Li <ta...@zillow.com>> wrote:
Hi Beam community,

According to the beam programming guide<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fprogramming-guide%2F%23what-is-a-schema&data=04%7C01%7Ctaol%40zillow.com%7C55162d586c1941d7c08608d8b403448b%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457273415399740%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=scwJtPympUKcz%2FEnCgeA%2FO4eT2sbH435y1E5Se0OWxg%3D&reserved=0>, Select.Flattened transform is able to flatten the nested fields of an array. Please see transactions_bank and transactions_purchaseAmount below, which are both ARRAY type, meaning the nested fields of transactions fields have been flattened to the top level as arrays.

[Graphical user interface, text, application  Description automatically generated]

However according to my test with direct runner, these 2 nested fields (bank and purchaseAmount) were not able to be flattened with Select.Flattened. Please see the original schema and flattened schema below. Is it a typo in the programming guide or is it a bug with this API? Thanks!

Original schema:

Field{name=userId, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress, description=, type=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=streetAddress, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=city, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, options={{}}}
Field{name=transactions, description=, type=FieldType{typeName=ARRAY, nullable=false, logicalType=null, collectionElementType=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=bank, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}

Flattened schema:

Field{name=userId, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=transactions, description=, type=FieldType{typeName=ARRAY, nullable=false, logicalType=null, collectionElementType=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=bank, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress_streetAddress, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress_city, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}


Here is the code to create the input data:

Schema transactionSchema = Schema.builder()
        .addStringField("bank")
        .addDoubleField("purchaseAmount")
        .build();
Row transactionOne = Row
        .withSchema(transactionSchema)
        .addValues("foo", 1.0)
        .build();
Row transactionTwo = Row
        .withSchema(transactionSchema)
        .addValues("bar", 2.0)
        .build();
Schema shippingAddressSchema = Schema.builder()
        .addStringField("streetAddress")
        .addStringField("city")
        .build();
Row address = Row
        .withSchema(shippingAddressSchema)
        .addValues("street", "city")
        .build();
Schema schema = Schema.builder()
        .addStringField("userId")
        .addRowField("shippingAddress", shippingAddressSchema)
        .addArrayField("transactions", Schema.FieldType.row(transactionSchema))
        .build();
Row row = Row
        .withSchema(schema)
        .addValues("user", address)
        .addArray(transactionOne, transactionTwo)
        .build();
PCollection<Row> input = executionContext
        .getPipeline()
        .apply(Create.of(row))
        .setRowSchema(schema);

Here is the transform code:
input.apply(Select.flattenedSchema())

Re: Potential bug with Select.Flattened?

Posted by Brian Hulette <bh...@google.com>.
+dev <de...@beam.apache.org>

Hey Tao,
Thanks for reporting this, it does look like a bug. Your description
"flatten the nested fields of an array" threw me off a little bit, it looks
like what's failing is flattening a struct that contains array fields. Does
that sound right to you?

I filed BEAM-11585 [1] to track this.

Brian

[1] https://issues.apache.org/jira/browse/BEAM-11585

On Fri, Jan 8, 2021 at 10:10 AM Tao Li <ta...@zillow.com> wrote:

> Hi Beam community,
>
>
>
> According to the beam programming guide
> <https://beam.apache.org/documentation/programming-guide/#what-is-a-schema>,
> Select.Flattened transform is able to flatten the nested fields of an
> array. Please see transactions_bank and transactions_purchaseAmount below,
> which are both ARRAY type, meaning the nested fields of transactions fields
> have been flattened to the top level as arrays.
>
>
>
> [image: Graphical user interface, text, application Description
> automatically generated]
>
>
>
> However according to my test with direct runner, these 2 nested fields
> (bank and purchaseAmount) were not able to be flattened with
> Select.Flattened. Please see the original schema and flattened schema
> below. Is it a typo in the programming guide or is it a bug with this API?
> Thanks!
>
>
>
> Original schema:
>
>
>
> Field{name=userId, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=shippingAddress, description=, type=FieldType{typeName=ROW,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=Fields:
>
> Field{name=streetAddress, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=city, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Options:{{}}, metadata={}}, options={{}}}
>
> Field{name=transactions, description=, type=FieldType{typeName=ARRAY,
> nullable=false, logicalType=null,
> collectionElementType=FieldType{typeName=ROW, nullable=false,
> logicalType=null, collectionElementType=null, mapKeyType=null,
> mapValueType=null, rowSchema=Fields:
>
> Field{name=bank, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Options:{{}}
>
>
>
> Flattened schema:
>
>
>
> Field{name=userId, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=transactions, description=, type=FieldType{typeName=ARRAY,
> nullable=false, logicalType=null,
> collectionElementType=FieldType{typeName=ROW, nullable=false,
> logicalType=null, collectionElementType=null, mapKeyType=null,
> mapValueType=null, rowSchema=Fields:
>
> Field{name=bank, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Field{name=shippingAddress_streetAddress, description=,
> type=FieldType{typeName=STRING, nullable=false, logicalType=null,
> collectionElementType=null, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Field{name=shippingAddress_city, description=,
> type=FieldType{typeName=STRING, nullable=false, logicalType=null,
> collectionElementType=null, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Options:{{}}
>
>
>
>
>
> Here is the code to create the input data:
>
>
>
> Schema transactionSchema = Schema.builder()
>
>         .addStringField("bank")
>
>         .addDoubleField("purchaseAmount")
>
>         .build();
>
> Row transactionOne = Row
>
>         .withSchema(transactionSchema)
>
>         .addValues("foo", 1.0)
>
>         .build();
>
> Row transactionTwo = Row
>
>         .withSchema(transactionSchema)
>
>         .addValues("bar", 2.0)
>
>         .build();
>
> Schema shippingAddressSchema = Schema.builder()
>
>         .addStringField("streetAddress")
>
>         .addStringField("city")
>
>         .build();
>
> Row address = Row
>
>         .withSchema(shippingAddressSchema)
>
>         .addValues("street", "city")
>
>         .build();
>
> Schema schema = Schema.builder()
>
>         .addStringField("userId")
>
>         .addRowField("shippingAddress", shippingAddressSchema)
>
>         .addArrayField("transactions",
> Schema.FieldType.row(transactionSchema))
>
>         .build();
>
> Row row = Row
>
>         .withSchema(schema)
>
>         .addValues("user", address)
>
>         .addArray(transactionOne, transactionTwo)
>
>         .build();
>
> PCollection<Row> input = executionContext
>
>         .getPipeline()
>
>         .apply(Create.of(row))
>
>         .setRowSchema(schema);
>
>
>
> Here is the transform code:
>
> input.apply(Select.flattenedSchema())
>

Re: Potential bug with Select.Flattened?

Posted by Brian Hulette <bh...@google.com>.
+dev <de...@beam.apache.org>

Hey Tao,
Thanks for reporting this, it does look like a bug. Your description
"flatten the nested fields of an array" threw me off a little bit, it looks
like what's failing is flattening a struct that contains array fields. Does
that sound right to you?

I filed BEAM-11585 [1] to track this.

Brian

[1] https://issues.apache.org/jira/browse/BEAM-11585

On Fri, Jan 8, 2021 at 10:10 AM Tao Li <ta...@zillow.com> wrote:

> Hi Beam community,
>
>
>
> According to the beam programming guide
> <https://beam.apache.org/documentation/programming-guide/#what-is-a-schema>,
> Select.Flattened transform is able to flatten the nested fields of an
> array. Please see transactions_bank and transactions_purchaseAmount below,
> which are both ARRAY type, meaning the nested fields of transactions fields
> have been flattened to the top level as arrays.
>
>
>
> [image: Graphical user interface, text, application Description
> automatically generated]
>
>
>
> However according to my test with direct runner, these 2 nested fields
> (bank and purchaseAmount) were not able to be flattened with
> Select.Flattened. Please see the original schema and flattened schema
> below. Is it a typo in the programming guide or is it a bug with this API?
> Thanks!
>
>
>
> Original schema:
>
>
>
> Field{name=userId, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=shippingAddress, description=, type=FieldType{typeName=ROW,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=Fields:
>
> Field{name=streetAddress, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=city, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Options:{{}}, metadata={}}, options={{}}}
>
> Field{name=transactions, description=, type=FieldType{typeName=ARRAY,
> nullable=false, logicalType=null,
> collectionElementType=FieldType{typeName=ROW, nullable=false,
> logicalType=null, collectionElementType=null, mapKeyType=null,
> mapValueType=null, rowSchema=Fields:
>
> Field{name=bank, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Options:{{}}
>
>
>
> Flattened schema:
>
>
>
> Field{name=userId, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=transactions, description=, type=FieldType{typeName=ARRAY,
> nullable=false, logicalType=null,
> collectionElementType=FieldType{typeName=ROW, nullable=false,
> logicalType=null, collectionElementType=null, mapKeyType=null,
> mapValueType=null, rowSchema=Fields:
>
> Field{name=bank, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Field{name=shippingAddress_streetAddress, description=,
> type=FieldType{typeName=STRING, nullable=false, logicalType=null,
> collectionElementType=null, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Field{name=shippingAddress_city, description=,
> type=FieldType{typeName=STRING, nullable=false, logicalType=null,
> collectionElementType=null, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Options:{{}}
>
>
>
>
>
> Here is the code to create the input data:
>
>
>
> Schema transactionSchema = Schema.builder()
>
>         .addStringField("bank")
>
>         .addDoubleField("purchaseAmount")
>
>         .build();
>
> Row transactionOne = Row
>
>         .withSchema(transactionSchema)
>
>         .addValues("foo", 1.0)
>
>         .build();
>
> Row transactionTwo = Row
>
>         .withSchema(transactionSchema)
>
>         .addValues("bar", 2.0)
>
>         .build();
>
> Schema shippingAddressSchema = Schema.builder()
>
>         .addStringField("streetAddress")
>
>         .addStringField("city")
>
>         .build();
>
> Row address = Row
>
>         .withSchema(shippingAddressSchema)
>
>         .addValues("street", "city")
>
>         .build();
>
> Schema schema = Schema.builder()
>
>         .addStringField("userId")
>
>         .addRowField("shippingAddress", shippingAddressSchema)
>
>         .addArrayField("transactions",
> Schema.FieldType.row(transactionSchema))
>
>         .build();
>
> Row row = Row
>
>         .withSchema(schema)
>
>         .addValues("user", address)
>
>         .addArray(transactionOne, transactionTwo)
>
>         .build();
>
> PCollection<Row> input = executionContext
>
>         .getPipeline()
>
>         .apply(Create.of(row))
>
>         .setRowSchema(schema);
>
>
>
> Here is the transform code:
>
> input.apply(Select.flattenedSchema())
>