You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Tao Li <ta...@zillow.com> on 2021/01/08 18:10:35 UTC
Potential bug with Select.Flattened?
Hi Beam community,
According to the beam programming guide<https://beam.apache.org/documentation/programming-guide/#what-is-a-schema>, Select.Flattened transform is able to flatten the nested fields of an array. Please see transactions_bank and transactions_purchaseAmount below, which are both ARRAY type, meaning the nested fields of transactions fields have been flattened to the top level as arrays.
[Graphical user interface, text, application Description automatically generated]
However according to my test with direct runner, these 2 nested fields (bank and purchaseAmount) were not able to be flattened with Select.Flattened. Please see the original schema and flattened schema below. Is it a typo in the programming guide or is it a bug with this API? Thanks!
Original schema:
Field{name=userId, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress, description=, type=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=streetAddress, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=city, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, options={{}}}
Field{name=transactions, description=, type=FieldType{typeName=ARRAY, nullable=false, logicalType=null, collectionElementType=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=bank, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}
Flattened schema:
Field{name=userId, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=transactions, description=, type=FieldType{typeName=ARRAY, nullable=false, logicalType=null, collectionElementType=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=bank, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress_streetAddress, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress_city, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}
Here is the code to create the input data:
Schema transactionSchema = Schema.builder()
.addStringField("bank")
.addDoubleField("purchaseAmount")
.build();
Row transactionOne = Row
.withSchema(transactionSchema)
.addValues("foo", 1.0)
.build();
Row transactionTwo = Row
.withSchema(transactionSchema)
.addValues("bar", 2.0)
.build();
Schema shippingAddressSchema = Schema.builder()
.addStringField("streetAddress")
.addStringField("city")
.build();
Row address = Row
.withSchema(shippingAddressSchema)
.addValues("street", "city")
.build();
Schema schema = Schema.builder()
.addStringField("userId")
.addRowField("shippingAddress", shippingAddressSchema)
.addArrayField("transactions", Schema.FieldType.row(transactionSchema))
.build();
Row row = Row
.withSchema(schema)
.addValues("user", address)
.addArray(transactionOne, transactionTwo)
.build();
PCollection<Row> input = executionContext
.getPipeline()
.apply(Create.of(row))
.setRowSchema(schema);
Here is the transform code:
input.apply(Select.flattenedSchema())
Re: Potential bug with Select.Flattened?
Posted by Tao Li <ta...@zillow.com>.
@Brian Hulette<ma...@google.com> thanks for the quick response! It’s about the “transactions” field, which is an array of rows. The row definition is a “bank” field plus a “purchaseAmount” field.
From: Brian Hulette <bh...@google.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Friday, January 8, 2021 at 10:29 AM
To: user <us...@beam.apache.org>, dev <de...@beam.apache.org>
Subject: Re: Potential bug with Select.Flattened?
+dev<ma...@beam.apache.org>
Hey Tao,
Thanks for reporting this, it does look like a bug. Your description "flatten the nested fields of an array" threw me off a little bit, it looks like what's failing is flattening a struct that contains array fields. Does that sound right to you?
I filed BEAM-11585 [1] to track this.
Brian
[1] https://issues.apache.org/jira/browse/BEAM-11585<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11585&data=04%7C01%7Ctaol%40zillow.com%7C55162d586c1941d7c08608d8b403448b%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457273415389798%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=P1%2FPNs6LIuEzgT8nrbgqpnRzfjenVmTNUbiO5vMC8Cs%3D&reserved=0>
On Fri, Jan 8, 2021 at 10:10 AM Tao Li <ta...@zillow.com>> wrote:
Hi Beam community,
According to the beam programming guide<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fprogramming-guide%2F%23what-is-a-schema&data=04%7C01%7Ctaol%40zillow.com%7C55162d586c1941d7c08608d8b403448b%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457273415399740%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=scwJtPympUKcz%2FEnCgeA%2FO4eT2sbH435y1E5Se0OWxg%3D&reserved=0>, Select.Flattened transform is able to flatten the nested fields of an array. Please see transactions_bank and transactions_purchaseAmount below, which are both ARRAY type, meaning the nested fields of transactions fields have been flattened to the top level as arrays.
[Graphical user interface, text, application Description automatically generated]
However according to my test with direct runner, these 2 nested fields (bank and purchaseAmount) were not able to be flattened with Select.Flattened. Please see the original schema and flattened schema below. Is it a typo in the programming guide or is it a bug with this API? Thanks!
Original schema:
Field{name=userId, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress, description=, type=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=streetAddress, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=city, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, options={{}}}
Field{name=transactions, description=, type=FieldType{typeName=ARRAY, nullable=false, logicalType=null, collectionElementType=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=bank, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}
Flattened schema:
Field{name=userId, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=transactions, description=, type=FieldType{typeName=ARRAY, nullable=false, logicalType=null, collectionElementType=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=bank, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress_streetAddress, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress_city, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}
Here is the code to create the input data:
Schema transactionSchema = Schema.builder()
.addStringField("bank")
.addDoubleField("purchaseAmount")
.build();
Row transactionOne = Row
.withSchema(transactionSchema)
.addValues("foo", 1.0)
.build();
Row transactionTwo = Row
.withSchema(transactionSchema)
.addValues("bar", 2.0)
.build();
Schema shippingAddressSchema = Schema.builder()
.addStringField("streetAddress")
.addStringField("city")
.build();
Row address = Row
.withSchema(shippingAddressSchema)
.addValues("street", "city")
.build();
Schema schema = Schema.builder()
.addStringField("userId")
.addRowField("shippingAddress", shippingAddressSchema)
.addArrayField("transactions", Schema.FieldType.row(transactionSchema))
.build();
Row row = Row
.withSchema(schema)
.addValues("user", address)
.addArray(transactionOne, transactionTwo)
.build();
PCollection<Row> input = executionContext
.getPipeline()
.apply(Create.of(row))
.setRowSchema(schema);
Here is the transform code:
input.apply(Select.flattenedSchema())
Re: Potential bug with Select.Flattened?
Posted by Tao Li <ta...@zillow.com>.
@Brian Hulette<ma...@google.com> thanks for the quick response! It’s about the “transactions” field, which is an array of rows. The row definition is a “bank” field plus a “purchaseAmount” field.
From: Brian Hulette <bh...@google.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Friday, January 8, 2021 at 10:29 AM
To: user <us...@beam.apache.org>, dev <de...@beam.apache.org>
Subject: Re: Potential bug with Select.Flattened?
+dev<ma...@beam.apache.org>
Hey Tao,
Thanks for reporting this, it does look like a bug. Your description "flatten the nested fields of an array" threw me off a little bit, it looks like what's failing is flattening a struct that contains array fields. Does that sound right to you?
I filed BEAM-11585 [1] to track this.
Brian
[1] https://issues.apache.org/jira/browse/BEAM-11585<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11585&data=04%7C01%7Ctaol%40zillow.com%7C55162d586c1941d7c08608d8b403448b%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457273415389798%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=P1%2FPNs6LIuEzgT8nrbgqpnRzfjenVmTNUbiO5vMC8Cs%3D&reserved=0>
On Fri, Jan 8, 2021 at 10:10 AM Tao Li <ta...@zillow.com>> wrote:
Hi Beam community,
According to the beam programming guide<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fprogramming-guide%2F%23what-is-a-schema&data=04%7C01%7Ctaol%40zillow.com%7C55162d586c1941d7c08608d8b403448b%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457273415399740%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=scwJtPympUKcz%2FEnCgeA%2FO4eT2sbH435y1E5Se0OWxg%3D&reserved=0>, Select.Flattened transform is able to flatten the nested fields of an array. Please see transactions_bank and transactions_purchaseAmount below, which are both ARRAY type, meaning the nested fields of transactions fields have been flattened to the top level as arrays.
[Graphical user interface, text, application Description automatically generated]
However according to my test with direct runner, these 2 nested fields (bank and purchaseAmount) were not able to be flattened with Select.Flattened. Please see the original schema and flattened schema below. Is it a typo in the programming guide or is it a bug with this API? Thanks!
Original schema:
Field{name=userId, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress, description=, type=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=streetAddress, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=city, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, options={{}}}
Field{name=transactions, description=, type=FieldType{typeName=ARRAY, nullable=false, logicalType=null, collectionElementType=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=bank, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}
Flattened schema:
Field{name=userId, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=transactions, description=, type=FieldType{typeName=ARRAY, nullable=false, logicalType=null, collectionElementType=FieldType{typeName=ROW, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=Fields:
Field{name=bank, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress_streetAddress, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Field{name=shippingAddress_city, description=, type=FieldType{typeName=STRING, nullable=false, logicalType=null, collectionElementType=null, mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}}, options={{}}}
Options:{{}}
Here is the code to create the input data:
Schema transactionSchema = Schema.builder()
.addStringField("bank")
.addDoubleField("purchaseAmount")
.build();
Row transactionOne = Row
.withSchema(transactionSchema)
.addValues("foo", 1.0)
.build();
Row transactionTwo = Row
.withSchema(transactionSchema)
.addValues("bar", 2.0)
.build();
Schema shippingAddressSchema = Schema.builder()
.addStringField("streetAddress")
.addStringField("city")
.build();
Row address = Row
.withSchema(shippingAddressSchema)
.addValues("street", "city")
.build();
Schema schema = Schema.builder()
.addStringField("userId")
.addRowField("shippingAddress", shippingAddressSchema)
.addArrayField("transactions", Schema.FieldType.row(transactionSchema))
.build();
Row row = Row
.withSchema(schema)
.addValues("user", address)
.addArray(transactionOne, transactionTwo)
.build();
PCollection<Row> input = executionContext
.getPipeline()
.apply(Create.of(row))
.setRowSchema(schema);
Here is the transform code:
input.apply(Select.flattenedSchema())
Re: Potential bug with Select.Flattened?
Posted by Brian Hulette <bh...@google.com>.
+dev <de...@beam.apache.org>
Hey Tao,
Thanks for reporting this, it does look like a bug. Your description
"flatten the nested fields of an array" threw me off a little bit, it looks
like what's failing is flattening a struct that contains array fields. Does
that sound right to you?
I filed BEAM-11585 [1] to track this.
Brian
[1] https://issues.apache.org/jira/browse/BEAM-11585
On Fri, Jan 8, 2021 at 10:10 AM Tao Li <ta...@zillow.com> wrote:
> Hi Beam community,
>
>
>
> According to the beam programming guide
> <https://beam.apache.org/documentation/programming-guide/#what-is-a-schema>,
> Select.Flattened transform is able to flatten the nested fields of an
> array. Please see transactions_bank and transactions_purchaseAmount below,
> which are both ARRAY type, meaning the nested fields of transactions fields
> have been flattened to the top level as arrays.
>
>
>
> [image: Graphical user interface, text, application Description
> automatically generated]
>
>
>
> However according to my test with direct runner, these 2 nested fields
> (bank and purchaseAmount) were not able to be flattened with
> Select.Flattened. Please see the original schema and flattened schema
> below. Is it a typo in the programming guide or is it a bug with this API?
> Thanks!
>
>
>
> Original schema:
>
>
>
> Field{name=userId, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=shippingAddress, description=, type=FieldType{typeName=ROW,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=Fields:
>
> Field{name=streetAddress, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=city, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Options:{{}}, metadata={}}, options={{}}}
>
> Field{name=transactions, description=, type=FieldType{typeName=ARRAY,
> nullable=false, logicalType=null,
> collectionElementType=FieldType{typeName=ROW, nullable=false,
> logicalType=null, collectionElementType=null, mapKeyType=null,
> mapValueType=null, rowSchema=Fields:
>
> Field{name=bank, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Options:{{}}
>
>
>
> Flattened schema:
>
>
>
> Field{name=userId, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=transactions, description=, type=FieldType{typeName=ARRAY,
> nullable=false, logicalType=null,
> collectionElementType=FieldType{typeName=ROW, nullable=false,
> logicalType=null, collectionElementType=null, mapKeyType=null,
> mapValueType=null, rowSchema=Fields:
>
> Field{name=bank, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Field{name=shippingAddress_streetAddress, description=,
> type=FieldType{typeName=STRING, nullable=false, logicalType=null,
> collectionElementType=null, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Field{name=shippingAddress_city, description=,
> type=FieldType{typeName=STRING, nullable=false, logicalType=null,
> collectionElementType=null, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Options:{{}}
>
>
>
>
>
> Here is the code to create the input data:
>
>
>
> Schema transactionSchema = Schema.builder()
>
> .addStringField("bank")
>
> .addDoubleField("purchaseAmount")
>
> .build();
>
> Row transactionOne = Row
>
> .withSchema(transactionSchema)
>
> .addValues("foo", 1.0)
>
> .build();
>
> Row transactionTwo = Row
>
> .withSchema(transactionSchema)
>
> .addValues("bar", 2.0)
>
> .build();
>
> Schema shippingAddressSchema = Schema.builder()
>
> .addStringField("streetAddress")
>
> .addStringField("city")
>
> .build();
>
> Row address = Row
>
> .withSchema(shippingAddressSchema)
>
> .addValues("street", "city")
>
> .build();
>
> Schema schema = Schema.builder()
>
> .addStringField("userId")
>
> .addRowField("shippingAddress", shippingAddressSchema)
>
> .addArrayField("transactions",
> Schema.FieldType.row(transactionSchema))
>
> .build();
>
> Row row = Row
>
> .withSchema(schema)
>
> .addValues("user", address)
>
> .addArray(transactionOne, transactionTwo)
>
> .build();
>
> PCollection<Row> input = executionContext
>
> .getPipeline()
>
> .apply(Create.of(row))
>
> .setRowSchema(schema);
>
>
>
> Here is the transform code:
>
> input.apply(Select.flattenedSchema())
>
Re: Potential bug with Select.Flattened?
Posted by Brian Hulette <bh...@google.com>.
+dev <de...@beam.apache.org>
Hey Tao,
Thanks for reporting this, it does look like a bug. Your description
"flatten the nested fields of an array" threw me off a little bit, it looks
like what's failing is flattening a struct that contains array fields. Does
that sound right to you?
I filed BEAM-11585 [1] to track this.
Brian
[1] https://issues.apache.org/jira/browse/BEAM-11585
On Fri, Jan 8, 2021 at 10:10 AM Tao Li <ta...@zillow.com> wrote:
> Hi Beam community,
>
>
>
> According to the beam programming guide
> <https://beam.apache.org/documentation/programming-guide/#what-is-a-schema>,
> Select.Flattened transform is able to flatten the nested fields of an
> array. Please see transactions_bank and transactions_purchaseAmount below,
> which are both ARRAY type, meaning the nested fields of transactions fields
> have been flattened to the top level as arrays.
>
>
>
> [image: Graphical user interface, text, application Description
> automatically generated]
>
>
>
> However according to my test with direct runner, these 2 nested fields
> (bank and purchaseAmount) were not able to be flattened with
> Select.Flattened. Please see the original schema and flattened schema
> below. Is it a typo in the programming guide or is it a bug with this API?
> Thanks!
>
>
>
> Original schema:
>
>
>
> Field{name=userId, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=shippingAddress, description=, type=FieldType{typeName=ROW,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=Fields:
>
> Field{name=streetAddress, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=city, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Options:{{}}, metadata={}}, options={{}}}
>
> Field{name=transactions, description=, type=FieldType{typeName=ARRAY,
> nullable=false, logicalType=null,
> collectionElementType=FieldType{typeName=ROW, nullable=false,
> logicalType=null, collectionElementType=null, mapKeyType=null,
> mapValueType=null, rowSchema=Fields:
>
> Field{name=bank, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Options:{{}}
>
>
>
> Flattened schema:
>
>
>
> Field{name=userId, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=transactions, description=, type=FieldType{typeName=ARRAY,
> nullable=false, logicalType=null,
> collectionElementType=FieldType{typeName=ROW, nullable=false,
> logicalType=null, collectionElementType=null, mapKeyType=null,
> mapValueType=null, rowSchema=Fields:
>
> Field{name=bank, description=, type=FieldType{typeName=STRING,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=purchaseAmount, description=, type=FieldType{typeName=DOUBLE,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Options:{{}}, metadata={}}, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Field{name=shippingAddress_streetAddress, description=,
> type=FieldType{typeName=STRING, nullable=false, logicalType=null,
> collectionElementType=null, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Field{name=shippingAddress_city, description=,
> type=FieldType{typeName=STRING, nullable=false, logicalType=null,
> collectionElementType=null, mapKeyType=null, mapValueType=null,
> rowSchema=null, metadata={}}, options={{}}}
>
> Options:{{}}
>
>
>
>
>
> Here is the code to create the input data:
>
>
>
> Schema transactionSchema = Schema.builder()
>
> .addStringField("bank")
>
> .addDoubleField("purchaseAmount")
>
> .build();
>
> Row transactionOne = Row
>
> .withSchema(transactionSchema)
>
> .addValues("foo", 1.0)
>
> .build();
>
> Row transactionTwo = Row
>
> .withSchema(transactionSchema)
>
> .addValues("bar", 2.0)
>
> .build();
>
> Schema shippingAddressSchema = Schema.builder()
>
> .addStringField("streetAddress")
>
> .addStringField("city")
>
> .build();
>
> Row address = Row
>
> .withSchema(shippingAddressSchema)
>
> .addValues("street", "city")
>
> .build();
>
> Schema schema = Schema.builder()
>
> .addStringField("userId")
>
> .addRowField("shippingAddress", shippingAddressSchema)
>
> .addArrayField("transactions",
> Schema.FieldType.row(transactionSchema))
>
> .build();
>
> Row row = Row
>
> .withSchema(schema)
>
> .addValues("user", address)
>
> .addArray(transactionOne, transactionTwo)
>
> .build();
>
> PCollection<Row> input = executionContext
>
> .getPipeline()
>
> .apply(Create.of(row))
>
> .setRowSchema(schema);
>
>
>
> Here is the transform code:
>
> input.apply(Select.flattenedSchema())
>