You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Sandeep Sagar <sa...@meltwater.com> on 2019/08/22 18:39:26 UTC

NPE in org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)

Hello,
Newbie here.Need help to figure out the issue here.
Doing a simple local spark Save using Iceberg with S3.
I see that my metadata folder was created in S3, so my schema/table
creation was successful.
When I try to run a Spark Write, I get a NullPointerException at

java.lang.NullPointerException
at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)
at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:28)
at org.apache.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:331)
at com.google.common.collect.Iterators$6.transform(Iterators.java:783)
at
com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at com.google.common.collect.Iterators.addAll(Iterators.java:356)
at com.google.common.collect.Lists.newArrayList(Lists.java:143)
at com.google.common.collect.Lists.newArrayList(Lists.java:130)
at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:55)
at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:28)
at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:364)
at org.apache.iceberg.types.TypeUtil$VisitFuture.get(TypeUtil.java:316)
at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:40)
at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:28)
at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:336)
at org.apache.iceberg.types.TypeUtil.reassignIds(TypeUtil.java:137)
at
org.apache.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:163)
at
org.apache.iceberg.spark.source.IcebergSource.validateWriteSchema(IcebergSource.java:146)
at
org.apache.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:76)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:255)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)

Maybe I am making a mistake in Schema Creation?

new Schema(
        required(1, "mwId", Types.StringType.get()),
        required(2, "mwVersion", LongType.get()),
        required(3, "id", Types.LongType.get()),
        required(4, "id_str", Types.StringType.get()),
        optional(5, "text", Types.StringType.get()),
        optional(6, "created_at", Types.StringType.get()),
        optional(7, "lang", Types.StringType.get())
);

PartitionSpec I used was PartitionSpec.unpartitioned();

The write code I used was:

Dataset<TweetItem> myDS;

...... (populate myDS)

myDS.write()
        .format("iceberg")
        .mode("append")
        .save(getTableLocation());


If I do a PrintSchema, I get:

root
 |-- columns: struct (nullable = true)
 |-- created_at: string (nullable = true)
 |-- encoder: struct (nullable = true)
 |-- id: long (nullable = true)
 |-- id_str: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- mwId: string (nullable = true)
 |-- mwVersion: long (nullable = true)
 |-- partitionSpec: struct (nullable = true)
 |-- schema: struct (nullable = true)
 |    |-- aliases: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: integer (valueContainsNull = true)
 |-- tableLocation: string (nullable = true)
 |-- text: string (nullable = true)

Appreciate your help.

regards
Sandeep

-- 
The
 information contained in this email may be confidential. It has been 

sent for the sole use of the intended recipient(s). If the
reader of this 
email is not an intended recipient, you are hereby 
notified that any 
unauthorized review, use, disclosure, dissemination, 
distribution, or 
copying of this message is strictly prohibited. If you 
have received this 
email in error, please notify
the sender immediately and destroy all copies 
of the message.

Re: NPE in org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)

Posted by Sandeep Sagar <sa...@meltwater.com>.
thank you, I will try to use coalesce as my data did not contain any nulls.
-Sandeep

On Thu, Aug 22, 2019 at 3:34 PM Ryan Blue <rb...@netflix.com> wrote:

> Datasets work fine with Iceberg. The problem was that your dataset’s
> schema was incompatible with the table schema you created because your
> dataset could contain null values for required fields in your table.
>
> All you should need to do is guarantee that those fields do not contain
> null values by using coalesce(nullable_col, "default-val") or filtering
> out the rows with nulls.
>
> On Thu, Aug 22, 2019 at 3:04 PM Sandeep Sagar <sa...@meltwater.com>
> wrote:
>
>> Hi Ryan,
>> I changed my code to use DataFrame instead of Dataset and it worked as I
>> was able to specify a Spark Schema.
>>
>> Does it mean we cannot use Datasets with Iceberg or I am using it
>> incorrectly?
>>
>> including the working code below:
>>
>> thanks
>> Sandeep
>>
>> List<Row> rows =  new ArrayList<Row>(tweets.size());
>>
>> tweets.stream().forEach( c -> {
>>     rows.add(c.getSparkRow());
>> });
>>
>> Dataset<Row> tweetRows= dlS3Connector.getSparkSession().createDataFrame(rows, TweetItem.getSparkSchema());
>>
>> tweetRows.write()
>>         .format("iceberg")
>>         .mode("append")
>>         .save(TweetItem.getTableLocation());
>>
>>
>> On Thu, Aug 22, 2019 at 1:07 PM Sandeep Sagar <
>> sandeep.sagar@meltwater.com> wrote:
>>
>>> This is how I created the dataset:
>>> tweet1, tweet2 are 2 objects I created by setting every member attribute.
>>>
>>> List<TweetItem> tweets= Arrays.asList(tweet1, tweet2);
>>>
>>> Dataset<TweetItem> dsTweets = dlS3Connector.getSparkSession().createDataset(tweets, Encoders.bean(TweetItem.class));
>>>
>>> The "columns" shows up if I do PrintSchema on dsTweets as
>>>
>>> dsTweets.printSchema();
>>>
>>> root
>>>  |-- columns: struct (nullable = true)
>>>  |-- created_at: string (nullable = true)
>>>  |-- encoder: struct (nullable = true)
>>>  |-- id: long (nullable = true)
>>>  |-- id_str: string (nullable = true)
>>>  |-- lang: string (nullable = true)
>>>  |-- mwId: string (nullable = true)
>>>  |-- mwVersion: long (nullable = true)
>>>  |-- partitionSpec: struct (nullable = true)
>>>  |-- schema: struct (nullable = true)
>>>  |    |-- aliases: map (nullable = true)
>>>  |    |    |-- key: string
>>>  |    |    |-- value: integer (valueContainsNull = true)
>>>  |-- tableLocation: string (nullable = true)
>>>  |-- text: string (nullable = true)
>>>
>>> So it seems to be inserted by createDataset API.
>>>
>>> Hope this clarifies.
>>>
>>> thanks
>>>
>>> Sandeep
>>>
>>>
>>> On Thu, Aug 22, 2019 at 12:51 PM Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> I don't know how you produced the dataset so I can't tell you how you
>>>> had a "columns" column. But I can help recommend a solution to the
>>>> required/optional problem.
>>>>
>>>> I think what's happening is that Spark doesn't know whether your
>>>> dataset contains nulls for those columns or not. To ensure that they don't
>>>> contain nulls, you should be able to filter out rows where those are null.
>>>> Alternatively, you can add a coalesce with the column and a non-null value
>>>> to set the default if the column contains a null. Then Spark should detect
>>>> that the column can't contain nulls and should be able to write.
>>>>
>>>> On Thu, Aug 22, 2019 at 12:39 PM Sandeep Sagar <
>>>> sandeep.sagar@meltwater.com> wrote:
>>>>
>>>>> I tried what you suggested and it went past that, but ran into the
>>>>> following -
>>>>> java.lang.IllegalArgumentException: Cannot write incompatible
>>>>> dataframe to table with schema:
>>>>> table {
>>>>>   1: mwId: required string
>>>>>   2: mwVersion: required long
>>>>>   3: id: required long
>>>>>   4: id_str: required string
>>>>>   5: text: optional string
>>>>>   6: created_at: optional string
>>>>>   7: lang: optional string
>>>>> }
>>>>> Problems:
>>>>> * mwId should be required, but is optional
>>>>> * mwVersion should be required, but is optional
>>>>> * id should be required, but is optional
>>>>> * id_str should be required, but is optional
>>>>>
>>>>> So, If I create a Dataset by using a select, it marks every column by
>>>>> default as optional and I don't see in the java doc as to how to reflect
>>>>> the schema in this.
>>>>> Also, when I was debugging the earlier issue, I noticed that it is
>>>>> querying the fieldStruct for a field name "columns", which is present in
>>>>> the schema but not in my incoming dataset and hence the NPE.  Why would
>>>>> incoming data have that?
>>>>>
>>>>> thanks
>>>>> Sandeep
>>>>>
>>>>> On Thu, Aug 22, 2019 at 12:13 PM Ryan Blue <rb...@netflix.com.invalid>
>>>>> wrote:
>>>>>
>>>>>> Hi Sandeep,
>>>>>>
>>>>>> It looks like the problem is that your schema doesn’t match. There
>>>>>> are columns in your dataset that don’t appear in your table schema, like
>>>>>> tableLocation. When Iceberg tries to match up the dataset’s schema with the
>>>>>> table’s schema, it can’t find those fields by name and hits an error.
>>>>>>
>>>>>> I think if you add a select, it should work:
>>>>>>
>>>>>> myDS.select("mwId", "mwVersion", "id", "id_str", "text", "created_at", "lang").write()
>>>>>>         .format("iceberg")
>>>>>>         .mode("append")
>>>>>>         .save(getTableLocation())
>>>>>>
>>>>>> Iceberg has code to catch the schema mismatch and throw an exception,
>>>>>> but it looks like it runs after the point where this is failing. We should
>>>>>> fix Iceberg to correctly assign IDs so you get a better error message. I’ll
>>>>>> open an issue for this.
>>>>>>
>>>>>> Another fix is also coming in the next Spark release. In 2.4, Spark
>>>>>> doesn’t validate the schema of a dataset when writing. That is fixed in
>>>>>> master and will be in the 3.0 release.
>>>>>>
>>>>>> rb
>>>>>>
>>>>>> On Thu, Aug 22, 2019 at 11:39 AM Sandeep Sagar <
>>>>>> sandeep.sagar@meltwater.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>> Newbie here.Need help to figure out the issue here.
>>>>>>> Doing a simple local spark Save using Iceberg with S3.
>>>>>>> I see that my metadata folder was created in S3, so my schema/table
>>>>>>> creation was successful.
>>>>>>> When I try to run a Spark Write, I get a NullPointerException at
>>>>>>>
>>>>>>> java.lang.NullPointerException
>>>>>>> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)
>>>>>>> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:28)
>>>>>>> at
>>>>>>> org.apache.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:331)
>>>>>>> at
>>>>>>> com.google.common.collect.Iterators$6.transform(Iterators.java:783)
>>>>>>> at
>>>>>>> com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>>>>>> at com.google.common.collect.Iterators.addAll(Iterators.java:356)
>>>>>>> at com.google.common.collect.Lists.newArrayList(Lists.java:143)
>>>>>>> at com.google.common.collect.Lists.newArrayList(Lists.java:130)
>>>>>>> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:55)
>>>>>>> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:28)
>>>>>>> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:364)
>>>>>>> at
>>>>>>> org.apache.iceberg.types.TypeUtil$VisitFuture.get(TypeUtil.java:316)
>>>>>>> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:40)
>>>>>>> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:28)
>>>>>>> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:336)
>>>>>>> at org.apache.iceberg.types.TypeUtil.reassignIds(TypeUtil.java:137)
>>>>>>> at
>>>>>>> org.apache.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:163)
>>>>>>> at
>>>>>>> org.apache.iceberg.spark.source.IcebergSource.validateWriteSchema(IcebergSource.java:146)
>>>>>>> at
>>>>>>> org.apache.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:76)
>>>>>>> at
>>>>>>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:255)
>>>>>>> at
>>>>>>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>>>>>>
>>>>>>> Maybe I am making a mistake in Schema Creation?
>>>>>>>
>>>>>>> new Schema(
>>>>>>>         required(1, "mwId", Types.StringType.get()),
>>>>>>>         required(2, "mwVersion", LongType.get()),
>>>>>>>         required(3, "id", Types.LongType.get()),
>>>>>>>         required(4, "id_str", Types.StringType.get()),
>>>>>>>         optional(5, "text", Types.StringType.get()),
>>>>>>>         optional(6, "created_at", Types.StringType.get()),
>>>>>>>         optional(7, "lang", Types.StringType.get())
>>>>>>> );
>>>>>>>
>>>>>>> PartitionSpec I used was PartitionSpec.unpartitioned();
>>>>>>>
>>>>>>> The write code I used was:
>>>>>>>
>>>>>>> Dataset<TweetItem> myDS;
>>>>>>>
>>>>>>> ...... (populate myDS)
>>>>>>>
>>>>>>> myDS.write()
>>>>>>>         .format("iceberg")
>>>>>>>         .mode("append")
>>>>>>>         .save(getTableLocation());
>>>>>>>
>>>>>>>
>>>>>>> If I do a PrintSchema, I get:
>>>>>>>
>>>>>>> root
>>>>>>>  |-- columns: struct (nullable = true)
>>>>>>>  |-- created_at: string (nullable = true)
>>>>>>>  |-- encoder: struct (nullable = true)
>>>>>>>  |-- id: long (nullable = true)
>>>>>>>  |-- id_str: string (nullable = true)
>>>>>>>  |-- lang: string (nullable = true)
>>>>>>>  |-- mwId: string (nullable = true)
>>>>>>>  |-- mwVersion: long (nullable = true)
>>>>>>>  |-- partitionSpec: struct (nullable = true)
>>>>>>>  |-- schema: struct (nullable = true)
>>>>>>>  |    |-- aliases: map (nullable = true)
>>>>>>>  |    |    |-- key: string
>>>>>>>  |    |    |-- value: integer (valueContainsNull = true)
>>>>>>>  |-- tableLocation: string (nullable = true)
>>>>>>>  |-- text: string (nullable = true)
>>>>>>>
>>>>>>> Appreciate your help.
>>>>>>>
>>>>>>> regards
>>>>>>> Sandeep
>>>>>>>
>>>>>>> The information contained in this email may be confidential. It has
>>>>>>> been sent for the sole use of the intended recipient(s). If the reader of
>>>>>>> this email is not an intended recipient, you are hereby notified that any
>>>>>>> unauthorized review, use, disclosure, dissemination, distribution, or
>>>>>>> copying of this message is strictly prohibited. If you have received this
>>>>>>> email in error, please notify the sender immediately and destroy all copies
>>>>>>> of the message.
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>
>>>>> The information contained in this email may be confidential. It has
>>>>> been sent for the sole use of the intended recipient(s). If the reader of
>>>>> this email is not an intended recipient, you are hereby notified that any
>>>>> unauthorized review, use, disclosure, dissemination, distribution, or
>>>>> copying of this message is strictly prohibited. If you have received this
>>>>> email in error, please notify the sender immediately and destroy all copies
>>>>> of the message.
>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>> The information contained in this email may be confidential. It has been
>> sent for the sole use of the intended recipient(s). If the reader of this
>> email is not an intended recipient, you are hereby notified that any
>> unauthorized review, use, disclosure, dissemination, distribution, or
>> copying of this message is strictly prohibited. If you have received this
>> email in error, please notify the sender immediately and destroy all copies
>> of the message.
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

-- 
The
 information contained in this email may be confidential. It has been 

sent for the sole use of the intended recipient(s). If the
reader of this 
email is not an intended recipient, you are hereby 
notified that any 
unauthorized review, use, disclosure, dissemination, 
distribution, or 
copying of this message is strictly prohibited. If you 
have received this 
email in error, please notify
the sender immediately and destroy all copies 
of the message.

Re: NPE in org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Datasets work fine with Iceberg. The problem was that your dataset’s schema
was incompatible with the table schema you created because your dataset
could contain null values for required fields in your table.

All you should need to do is guarantee that those fields do not contain
null values by using coalesce(nullable_col, "default-val") or filtering out
the rows with nulls.

On Thu, Aug 22, 2019 at 3:04 PM Sandeep Sagar <sa...@meltwater.com>
wrote:

> Hi Ryan,
> I changed my code to use DataFrame instead of Dataset and it worked as I
> was able to specify a Spark Schema.
>
> Does it mean we cannot use Datasets with Iceberg or I am using it
> incorrectly?
>
> including the working code below:
>
> thanks
> Sandeep
>
> List<Row> rows =  new ArrayList<Row>(tweets.size());
>
> tweets.stream().forEach( c -> {
>     rows.add(c.getSparkRow());
> });
>
> Dataset<Row> tweetRows= dlS3Connector.getSparkSession().createDataFrame(rows, TweetItem.getSparkSchema());
>
> tweetRows.write()
>         .format("iceberg")
>         .mode("append")
>         .save(TweetItem.getTableLocation());
>
>
> On Thu, Aug 22, 2019 at 1:07 PM Sandeep Sagar <sa...@meltwater.com>
> wrote:
>
>> This is how I created the dataset:
>> tweet1, tweet2 are 2 objects I created by setting every member attribute.
>>
>> List<TweetItem> tweets= Arrays.asList(tweet1, tweet2);
>>
>> Dataset<TweetItem> dsTweets = dlS3Connector.getSparkSession().createDataset(tweets, Encoders.bean(TweetItem.class));
>>
>> The "columns" shows up if I do PrintSchema on dsTweets as
>>
>> dsTweets.printSchema();
>>
>> root
>>  |-- columns: struct (nullable = true)
>>  |-- created_at: string (nullable = true)
>>  |-- encoder: struct (nullable = true)
>>  |-- id: long (nullable = true)
>>  |-- id_str: string (nullable = true)
>>  |-- lang: string (nullable = true)
>>  |-- mwId: string (nullable = true)
>>  |-- mwVersion: long (nullable = true)
>>  |-- partitionSpec: struct (nullable = true)
>>  |-- schema: struct (nullable = true)
>>  |    |-- aliases: map (nullable = true)
>>  |    |    |-- key: string
>>  |    |    |-- value: integer (valueContainsNull = true)
>>  |-- tableLocation: string (nullable = true)
>>  |-- text: string (nullable = true)
>>
>> So it seems to be inserted by createDataset API.
>>
>> Hope this clarifies.
>>
>> thanks
>>
>> Sandeep
>>
>>
>> On Thu, Aug 22, 2019 at 12:51 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> I don't know how you produced the dataset so I can't tell you how you
>>> had a "columns" column. But I can help recommend a solution to the
>>> required/optional problem.
>>>
>>> I think what's happening is that Spark doesn't know whether your dataset
>>> contains nulls for those columns or not. To ensure that they don't contain
>>> nulls, you should be able to filter out rows where those are null.
>>> Alternatively, you can add a coalesce with the column and a non-null value
>>> to set the default if the column contains a null. Then Spark should detect
>>> that the column can't contain nulls and should be able to write.
>>>
>>> On Thu, Aug 22, 2019 at 12:39 PM Sandeep Sagar <
>>> sandeep.sagar@meltwater.com> wrote:
>>>
>>>> I tried what you suggested and it went past that, but ran into the
>>>> following -
>>>> java.lang.IllegalArgumentException: Cannot write incompatible dataframe
>>>> to table with schema:
>>>> table {
>>>>   1: mwId: required string
>>>>   2: mwVersion: required long
>>>>   3: id: required long
>>>>   4: id_str: required string
>>>>   5: text: optional string
>>>>   6: created_at: optional string
>>>>   7: lang: optional string
>>>> }
>>>> Problems:
>>>> * mwId should be required, but is optional
>>>> * mwVersion should be required, but is optional
>>>> * id should be required, but is optional
>>>> * id_str should be required, but is optional
>>>>
>>>> So, If I create a Dataset by using a select, it marks every column by
>>>> default as optional and I don't see in the java doc as to how to reflect
>>>> the schema in this.
>>>> Also, when I was debugging the earlier issue, I noticed that it is
>>>> querying the fieldStruct for a field name "columns", which is present in
>>>> the schema but not in my incoming dataset and hence the NPE.  Why would
>>>> incoming data have that?
>>>>
>>>> thanks
>>>> Sandeep
>>>>
>>>> On Thu, Aug 22, 2019 at 12:13 PM Ryan Blue <rb...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Hi Sandeep,
>>>>>
>>>>> It looks like the problem is that your schema doesn’t match. There are
>>>>> columns in your dataset that don’t appear in your table schema, like
>>>>> tableLocation. When Iceberg tries to match up the dataset’s schema with the
>>>>> table’s schema, it can’t find those fields by name and hits an error.
>>>>>
>>>>> I think if you add a select, it should work:
>>>>>
>>>>> myDS.select("mwId", "mwVersion", "id", "id_str", "text", "created_at", "lang").write()
>>>>>         .format("iceberg")
>>>>>         .mode("append")
>>>>>         .save(getTableLocation())
>>>>>
>>>>> Iceberg has code to catch the schema mismatch and throw an exception,
>>>>> but it looks like it runs after the point where this is failing. We should
>>>>> fix Iceberg to correctly assign IDs so you get a better error message. I’ll
>>>>> open an issue for this.
>>>>>
>>>>> Another fix is also coming in the next Spark release. In 2.4, Spark
>>>>> doesn’t validate the schema of a dataset when writing. That is fixed in
>>>>> master and will be in the 3.0 release.
>>>>>
>>>>> rb
>>>>>
>>>>> On Thu, Aug 22, 2019 at 11:39 AM Sandeep Sagar <
>>>>> sandeep.sagar@meltwater.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>> Newbie here.Need help to figure out the issue here.
>>>>>> Doing a simple local spark Save using Iceberg with S3.
>>>>>> I see that my metadata folder was created in S3, so my schema/table
>>>>>> creation was successful.
>>>>>> When I try to run a Spark Write, I get a NullPointerException at
>>>>>>
>>>>>> java.lang.NullPointerException
>>>>>> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)
>>>>>> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:28)
>>>>>> at
>>>>>> org.apache.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:331)
>>>>>> at com.google.common.collect.Iterators$6.transform(Iterators.java:783)
>>>>>> at
>>>>>> com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>>>>> at com.google.common.collect.Iterators.addAll(Iterators.java:356)
>>>>>> at com.google.common.collect.Lists.newArrayList(Lists.java:143)
>>>>>> at com.google.common.collect.Lists.newArrayList(Lists.java:130)
>>>>>> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:55)
>>>>>> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:28)
>>>>>> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:364)
>>>>>> at
>>>>>> org.apache.iceberg.types.TypeUtil$VisitFuture.get(TypeUtil.java:316)
>>>>>> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:40)
>>>>>> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:28)
>>>>>> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:336)
>>>>>> at org.apache.iceberg.types.TypeUtil.reassignIds(TypeUtil.java:137)
>>>>>> at
>>>>>> org.apache.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:163)
>>>>>> at
>>>>>> org.apache.iceberg.spark.source.IcebergSource.validateWriteSchema(IcebergSource.java:146)
>>>>>> at
>>>>>> org.apache.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:76)
>>>>>> at
>>>>>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:255)
>>>>>> at
>>>>>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>>>>>
>>>>>> Maybe I am making a mistake in Schema Creation?
>>>>>>
>>>>>> new Schema(
>>>>>>         required(1, "mwId", Types.StringType.get()),
>>>>>>         required(2, "mwVersion", LongType.get()),
>>>>>>         required(3, "id", Types.LongType.get()),
>>>>>>         required(4, "id_str", Types.StringType.get()),
>>>>>>         optional(5, "text", Types.StringType.get()),
>>>>>>         optional(6, "created_at", Types.StringType.get()),
>>>>>>         optional(7, "lang", Types.StringType.get())
>>>>>> );
>>>>>>
>>>>>> PartitionSpec I used was PartitionSpec.unpartitioned();
>>>>>>
>>>>>> The write code I used was:
>>>>>>
>>>>>> Dataset<TweetItem> myDS;
>>>>>>
>>>>>> ...... (populate myDS)
>>>>>>
>>>>>> myDS.write()
>>>>>>         .format("iceberg")
>>>>>>         .mode("append")
>>>>>>         .save(getTableLocation());
>>>>>>
>>>>>>
>>>>>> If I do a PrintSchema, I get:
>>>>>>
>>>>>> root
>>>>>>  |-- columns: struct (nullable = true)
>>>>>>  |-- created_at: string (nullable = true)
>>>>>>  |-- encoder: struct (nullable = true)
>>>>>>  |-- id: long (nullable = true)
>>>>>>  |-- id_str: string (nullable = true)
>>>>>>  |-- lang: string (nullable = true)
>>>>>>  |-- mwId: string (nullable = true)
>>>>>>  |-- mwVersion: long (nullable = true)
>>>>>>  |-- partitionSpec: struct (nullable = true)
>>>>>>  |-- schema: struct (nullable = true)
>>>>>>  |    |-- aliases: map (nullable = true)
>>>>>>  |    |    |-- key: string
>>>>>>  |    |    |-- value: integer (valueContainsNull = true)
>>>>>>  |-- tableLocation: string (nullable = true)
>>>>>>  |-- text: string (nullable = true)
>>>>>>
>>>>>> Appreciate your help.
>>>>>>
>>>>>> regards
>>>>>> Sandeep
>>>>>>
>>>>>> The information contained in this email may be confidential. It has
>>>>>> been sent for the sole use of the intended recipient(s). If the reader of
>>>>>> this email is not an intended recipient, you are hereby notified that any
>>>>>> unauthorized review, use, disclosure, dissemination, distribution, or
>>>>>> copying of this message is strictly prohibited. If you have received this
>>>>>> email in error, please notify the sender immediately and destroy all copies
>>>>>> of the message.
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>> The information contained in this email may be confidential. It has
>>>> been sent for the sole use of the intended recipient(s). If the reader of
>>>> this email is not an intended recipient, you are hereby notified that any
>>>> unauthorized review, use, disclosure, dissemination, distribution, or
>>>> copying of this message is strictly prohibited. If you have received this
>>>> email in error, please notify the sender immediately and destroy all copies
>>>> of the message.
>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
> The information contained in this email may be confidential. It has been
> sent for the sole use of the intended recipient(s). If the reader of this
> email is not an intended recipient, you are hereby notified that any
> unauthorized review, use, disclosure, dissemination, distribution, or
> copying of this message is strictly prohibited. If you have received this
> email in error, please notify the sender immediately and destroy all copies
> of the message.



-- 
Ryan Blue
Software Engineer
Netflix

Re: NPE in org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)

Posted by Sandeep Sagar <sa...@meltwater.com>.
Hi Ryan,
I changed my code to use DataFrame instead of Dataset and it worked as I
was able to specify a Spark Schema.

Does it mean we cannot use Datasets with Iceberg or I am using it
incorrectly?

including the working code below:

thanks
Sandeep

List<Row> rows =  new ArrayList<Row>(tweets.size());

tweets.stream().forEach( c -> {
    rows.add(c.getSparkRow());
});

Dataset<Row> tweetRows=
dlS3Connector.getSparkSession().createDataFrame(rows,
TweetItem.getSparkSchema());

tweetRows.write()
        .format("iceberg")
        .mode("append")
        .save(TweetItem.getTableLocation());


On Thu, Aug 22, 2019 at 1:07 PM Sandeep Sagar <sa...@meltwater.com>
wrote:

> This is how I created the dataset:
> tweet1, tweet2 are 2 objects I created by setting every member attribute.
>
> List<TweetItem> tweets= Arrays.asList(tweet1, tweet2);
>
> Dataset<TweetItem> dsTweets = dlS3Connector.getSparkSession().createDataset(tweets, Encoders.bean(TweetItem.class));
>
> The "columns" shows up if I do PrintSchema on dsTweets as
>
> dsTweets.printSchema();
>
> root
>  |-- columns: struct (nullable = true)
>  |-- created_at: string (nullable = true)
>  |-- encoder: struct (nullable = true)
>  |-- id: long (nullable = true)
>  |-- id_str: string (nullable = true)
>  |-- lang: string (nullable = true)
>  |-- mwId: string (nullable = true)
>  |-- mwVersion: long (nullable = true)
>  |-- partitionSpec: struct (nullable = true)
>  |-- schema: struct (nullable = true)
>  |    |-- aliases: map (nullable = true)
>  |    |    |-- key: string
>  |    |    |-- value: integer (valueContainsNull = true)
>  |-- tableLocation: string (nullable = true)
>  |-- text: string (nullable = true)
>
> So it seems to be inserted by createDataset API.
>
> Hope this clarifies.
>
> thanks
>
> Sandeep
>
>
> On Thu, Aug 22, 2019 at 12:51 PM Ryan Blue <rb...@netflix.com> wrote:
>
>> I don't know how you produced the dataset so I can't tell you how you had
>> a "columns" column. But I can help recommend a solution to the
>> required/optional problem.
>>
>> I think what's happening is that Spark doesn't know whether your dataset
>> contains nulls for those columns or not. To ensure that they don't contain
>> nulls, you should be able to filter out rows where those are null.
>> Alternatively, you can add a coalesce with the column and a non-null value
>> to set the default if the column contains a null. Then Spark should detect
>> that the column can't contain nulls and should be able to write.
>>
>> On Thu, Aug 22, 2019 at 12:39 PM Sandeep Sagar <
>> sandeep.sagar@meltwater.com> wrote:
>>
>>> I tried what you suggested and it went past that, but ran into the
>>> following -
>>> java.lang.IllegalArgumentException: Cannot write incompatible dataframe
>>> to table with schema:
>>> table {
>>>   1: mwId: required string
>>>   2: mwVersion: required long
>>>   3: id: required long
>>>   4: id_str: required string
>>>   5: text: optional string
>>>   6: created_at: optional string
>>>   7: lang: optional string
>>> }
>>> Problems:
>>> * mwId should be required, but is optional
>>> * mwVersion should be required, but is optional
>>> * id should be required, but is optional
>>> * id_str should be required, but is optional
>>>
>>> So, If I create a Dataset by using a select, it marks every column by
>>> default as optional and I don't see in the java doc as to how to reflect
>>> the schema in this.
>>> Also, when I was debugging the earlier issue, I noticed that it is
>>> querying the fieldStruct for a field name "columns", which is present in
>>> the schema but not in my incoming dataset and hence the NPE.  Why would
>>> incoming data have that?
>>>
>>> thanks
>>> Sandeep
>>>
>>> On Thu, Aug 22, 2019 at 12:13 PM Ryan Blue <rb...@netflix.com.invalid>
>>> wrote:
>>>
>>>> Hi Sandeep,
>>>>
>>>> It looks like the problem is that your schema doesn’t match. There are
>>>> columns in your dataset that don’t appear in your table schema, like
>>>> tableLocation. When Iceberg tries to match up the dataset’s schema with the
>>>> table’s schema, it can’t find those fields by name and hits an error.
>>>>
>>>> I think if you add a select, it should work:
>>>>
>>>> myDS.select("mwId", "mwVersion", "id", "id_str", "text", "created_at", "lang").write()
>>>>         .format("iceberg")
>>>>         .mode("append")
>>>>         .save(getTableLocation())
>>>>
>>>> Iceberg has code to catch the schema mismatch and throw an exception,
>>>> but it looks like it runs after the point where this is failing. We should
>>>> fix Iceberg to correctly assign IDs so you get a better error message. I’ll
>>>> open an issue for this.
>>>>
>>>> Another fix is also coming in the next Spark release. In 2.4, Spark
>>>> doesn’t validate the schema of a dataset when writing. That is fixed in
>>>> master and will be in the 3.0 release.
>>>>
>>>> rb
>>>>
>>>> On Thu, Aug 22, 2019 at 11:39 AM Sandeep Sagar <
>>>> sandeep.sagar@meltwater.com> wrote:
>>>>
>>>>> Hello,
>>>>> Newbie here.Need help to figure out the issue here.
>>>>> Doing a simple local spark Save using Iceberg with S3.
>>>>> I see that my metadata folder was created in S3, so my schema/table
>>>>> creation was successful.
>>>>> When I try to run a Spark Write, I get a NullPointerException at
>>>>>
>>>>> java.lang.NullPointerException
>>>>> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)
>>>>> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:28)
>>>>> at
>>>>> org.apache.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:331)
>>>>> at com.google.common.collect.Iterators$6.transform(Iterators.java:783)
>>>>> at
>>>>> com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>>>> at com.google.common.collect.Iterators.addAll(Iterators.java:356)
>>>>> at com.google.common.collect.Lists.newArrayList(Lists.java:143)
>>>>> at com.google.common.collect.Lists.newArrayList(Lists.java:130)
>>>>> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:55)
>>>>> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:28)
>>>>> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:364)
>>>>> at org.apache.iceberg.types.TypeUtil$VisitFuture.get(TypeUtil.java:316)
>>>>> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:40)
>>>>> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:28)
>>>>> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:336)
>>>>> at org.apache.iceberg.types.TypeUtil.reassignIds(TypeUtil.java:137)
>>>>> at
>>>>> org.apache.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:163)
>>>>> at
>>>>> org.apache.iceberg.spark.source.IcebergSource.validateWriteSchema(IcebergSource.java:146)
>>>>> at
>>>>> org.apache.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:76)
>>>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:255)
>>>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>>>>
>>>>> Maybe I am making a mistake in Schema Creation?
>>>>>
>>>>> new Schema(
>>>>>         required(1, "mwId", Types.StringType.get()),
>>>>>         required(2, "mwVersion", LongType.get()),
>>>>>         required(3, "id", Types.LongType.get()),
>>>>>         required(4, "id_str", Types.StringType.get()),
>>>>>         optional(5, "text", Types.StringType.get()),
>>>>>         optional(6, "created_at", Types.StringType.get()),
>>>>>         optional(7, "lang", Types.StringType.get())
>>>>> );
>>>>>
>>>>> PartitionSpec I used was PartitionSpec.unpartitioned();
>>>>>
>>>>> The write code I used was:
>>>>>
>>>>> Dataset<TweetItem> myDS;
>>>>>
>>>>> ...... (populate myDS)
>>>>>
>>>>> myDS.write()
>>>>>         .format("iceberg")
>>>>>         .mode("append")
>>>>>         .save(getTableLocation());
>>>>>
>>>>>
>>>>> If I do a PrintSchema, I get:
>>>>>
>>>>> root
>>>>>  |-- columns: struct (nullable = true)
>>>>>  |-- created_at: string (nullable = true)
>>>>>  |-- encoder: struct (nullable = true)
>>>>>  |-- id: long (nullable = true)
>>>>>  |-- id_str: string (nullable = true)
>>>>>  |-- lang: string (nullable = true)
>>>>>  |-- mwId: string (nullable = true)
>>>>>  |-- mwVersion: long (nullable = true)
>>>>>  |-- partitionSpec: struct (nullable = true)
>>>>>  |-- schema: struct (nullable = true)
>>>>>  |    |-- aliases: map (nullable = true)
>>>>>  |    |    |-- key: string
>>>>>  |    |    |-- value: integer (valueContainsNull = true)
>>>>>  |-- tableLocation: string (nullable = true)
>>>>>  |-- text: string (nullable = true)
>>>>>
>>>>> Appreciate your help.
>>>>>
>>>>> regards
>>>>> Sandeep
>>>>>
>>>>> The information contained in this email may be confidential. It has
>>>>> been sent for the sole use of the intended recipient(s). If the reader of
>>>>> this email is not an intended recipient, you are hereby notified that any
>>>>> unauthorized review, use, disclosure, dissemination, distribution, or
>>>>> copying of this message is strictly prohibited. If you have received this
>>>>> email in error, please notify the sender immediately and destroy all copies
>>>>> of the message.
>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>> The information contained in this email may be confidential. It has been
>>> sent for the sole use of the intended recipient(s). If the reader of this
>>> email is not an intended recipient, you are hereby notified that any
>>> unauthorized review, use, disclosure, dissemination, distribution, or
>>> copying of this message is strictly prohibited. If you have received this
>>> email in error, please notify the sender immediately and destroy all copies
>>> of the message.
>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
The
 information contained in this email may be confidential. It has been 

sent for the sole use of the intended recipient(s). If the
reader of this 
email is not an intended recipient, you are hereby 
notified that any 
unauthorized review, use, disclosure, dissemination, 
distribution, or 
copying of this message is strictly prohibited. If you 
have received this 
email in error, please notify
the sender immediately and destroy all copies 
of the message.

Re: NPE in org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
I don't know how you produced the dataset so I can't tell you how you had a
"columns" column. But I can help recommend a solution to the
required/optional problem.

I think what's happening is that Spark doesn't know whether your dataset
contains nulls for those columns or not. To ensure that they don't contain
nulls, you should be able to filter out rows where those are null.
Alternatively, you can add a coalesce with the column and a non-null value
to set the default if the column contains a null. Then Spark should detect
that the column can't contain nulls and should be able to write.

On Thu, Aug 22, 2019 at 12:39 PM Sandeep Sagar <sa...@meltwater.com>
wrote:

> I tried what you suggested and it went past that, but ran into the
> following -
> java.lang.IllegalArgumentException: Cannot write incompatible dataframe to
> table with schema:
> table {
>   1: mwId: required string
>   2: mwVersion: required long
>   3: id: required long
>   4: id_str: required string
>   5: text: optional string
>   6: created_at: optional string
>   7: lang: optional string
> }
> Problems:
> * mwId should be required, but is optional
> * mwVersion should be required, but is optional
> * id should be required, but is optional
> * id_str should be required, but is optional
>
> So, If I create a Dataset by using a select, it marks every column by
> default as optional and I don't see in the java doc as to how to reflect
> the schema in this.
> Also, when I was debugging the earlier issue, I noticed that it is
> querying the fieldStruct for a field name "columns", which is present in
> the schema but not in my incoming dataset and hence the NPE.  Why would
> incoming data have that?
>
> thanks
> Sandeep
>
> On Thu, Aug 22, 2019 at 12:13 PM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> Hi Sandeep,
>>
>> It looks like the problem is that your schema doesn’t match. There are
>> columns in your dataset that don’t appear in your table schema, like
>> tableLocation. When Iceberg tries to match up the dataset’s schema with the
>> table’s schema, it can’t find those fields by name and hits an error.
>>
>> I think if you add a select, it should work:
>>
>> myDS.select("mwId", "mwVersion", "id", "id_str", "text", "created_at", "lang").write()
>>         .format("iceberg")
>>         .mode("append")
>>         .save(getTableLocation())
>>
>> Iceberg has code to catch the schema mismatch and throw an exception, but
>> it looks like it runs after the point where this is failing. We should fix
>> Iceberg to correctly assign IDs so you get a better error message. I’ll
>> open an issue for this.
>>
>> Another fix is also coming in the next Spark release. In 2.4, Spark
>> doesn’t validate the schema of a dataset when writing. That is fixed in
>> master and will be in the 3.0 release.
>>
>> rb
>>
>> On Thu, Aug 22, 2019 at 11:39 AM Sandeep Sagar <
>> sandeep.sagar@meltwater.com> wrote:
>>
>>> Hello,
>>> Newbie here.Need help to figure out the issue here.
>>> Doing a simple local spark Save using Iceberg with S3.
>>> I see that my metadata folder was created in S3, so my schema/table
>>> creation was successful.
>>> When I try to run a Spark Write, I get a NullPointerException at
>>>
>>> java.lang.NullPointerException
>>> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)
>>> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:28)
>>> at
>>> org.apache.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:331)
>>> at com.google.common.collect.Iterators$6.transform(Iterators.java:783)
>>> at
>>> com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>> at com.google.common.collect.Iterators.addAll(Iterators.java:356)
>>> at com.google.common.collect.Lists.newArrayList(Lists.java:143)
>>> at com.google.common.collect.Lists.newArrayList(Lists.java:130)
>>> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:55)
>>> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:28)
>>> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:364)
>>> at org.apache.iceberg.types.TypeUtil$VisitFuture.get(TypeUtil.java:316)
>>> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:40)
>>> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:28)
>>> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:336)
>>> at org.apache.iceberg.types.TypeUtil.reassignIds(TypeUtil.java:137)
>>> at
>>> org.apache.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:163)
>>> at
>>> org.apache.iceberg.spark.source.IcebergSource.validateWriteSchema(IcebergSource.java:146)
>>> at
>>> org.apache.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:76)
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:255)
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>>
>>> Maybe I am making a mistake in Schema Creation?
>>>
>>> new Schema(
>>>         required(1, "mwId", Types.StringType.get()),
>>>         required(2, "mwVersion", LongType.get()),
>>>         required(3, "id", Types.LongType.get()),
>>>         required(4, "id_str", Types.StringType.get()),
>>>         optional(5, "text", Types.StringType.get()),
>>>         optional(6, "created_at", Types.StringType.get()),
>>>         optional(7, "lang", Types.StringType.get())
>>> );
>>>
>>> PartitionSpec I used was PartitionSpec.unpartitioned();
>>>
>>> The write code I used was:
>>>
>>> Dataset<TweetItem> myDS;
>>>
>>> ...... (populate myDS)
>>>
>>> myDS.write()
>>>         .format("iceberg")
>>>         .mode("append")
>>>         .save(getTableLocation());
>>>
>>>
>>> If I do a PrintSchema, I get:
>>>
>>> root
>>>  |-- columns: struct (nullable = true)
>>>  |-- created_at: string (nullable = true)
>>>  |-- encoder: struct (nullable = true)
>>>  |-- id: long (nullable = true)
>>>  |-- id_str: string (nullable = true)
>>>  |-- lang: string (nullable = true)
>>>  |-- mwId: string (nullable = true)
>>>  |-- mwVersion: long (nullable = true)
>>>  |-- partitionSpec: struct (nullable = true)
>>>  |-- schema: struct (nullable = true)
>>>  |    |-- aliases: map (nullable = true)
>>>  |    |    |-- key: string
>>>  |    |    |-- value: integer (valueContainsNull = true)
>>>  |-- tableLocation: string (nullable = true)
>>>  |-- text: string (nullable = true)
>>>
>>> Appreciate your help.
>>>
>>> regards
>>> Sandeep
>>>
>>> The information contained in this email may be confidential. It has been
>>> sent for the sole use of the intended recipient(s). If the reader of this
>>> email is not an intended recipient, you are hereby notified that any
>>> unauthorized review, use, disclosure, dissemination, distribution, or
>>> copying of this message is strictly prohibited. If you have received this
>>> email in error, please notify the sender immediately and destroy all copies
>>> of the message.
>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
> The information contained in this email may be confidential. It has been
> sent for the sole use of the intended recipient(s). If the reader of this
> email is not an intended recipient, you are hereby notified that any
> unauthorized review, use, disclosure, dissemination, distribution, or
> copying of this message is strictly prohibited. If you have received this
> email in error, please notify the sender immediately and destroy all copies
> of the message.



-- 
Ryan Blue
Software Engineer
Netflix

Re: NPE in org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)

Posted by Sandeep Sagar <sa...@meltwater.com>.
I tried what you suggested and it went past that, but ran into the
following -
java.lang.IllegalArgumentException: Cannot write incompatible dataframe to
table with schema:
table {
  1: mwId: required string
  2: mwVersion: required long
  3: id: required long
  4: id_str: required string
  5: text: optional string
  6: created_at: optional string
  7: lang: optional string
}
Problems:
* mwId should be required, but is optional
* mwVersion should be required, but is optional
* id should be required, but is optional
* id_str should be required, but is optional

So, If I create a Dataset by using a select, it marks every column by
default as optional and I don't see in the java doc as to how to reflect
the schema in this.
Also, when I was debugging the earlier issue, I noticed that it is querying
the fieldStruct for a field name "columns", which is present in the schema
but not in my incoming dataset and hence the NPE.  Why would incoming data
have that?

thanks
Sandeep

On Thu, Aug 22, 2019 at 12:13 PM Ryan Blue <rb...@netflix.com.invalid>
wrote:

> Hi Sandeep,
>
> It looks like the problem is that your schema doesn’t match. There are
> columns in your dataset that don’t appear in your table schema, like
> tableLocation. When Iceberg tries to match up the dataset’s schema with the
> table’s schema, it can’t find those fields by name and hits an error.
>
> I think if you add a select, it should work:
>
> myDS.select("mwId", "mwVersion", "id", "id_str", "text", "created_at", "lang").write()
>         .format("iceberg")
>         .mode("append")
>         .save(getTableLocation())
>
> Iceberg has code to catch the schema mismatch and throw an exception, but
> it looks like it runs after the point where this is failing. We should fix
> Iceberg to correctly assign IDs so you get a better error message. I’ll
> open an issue for this.
>
> Another fix is also coming in the next Spark release. In 2.4, Spark
> doesn’t validate the schema of a dataset when writing. That is fixed in
> master and will be in the 3.0 release.
>
> rb
>
> On Thu, Aug 22, 2019 at 11:39 AM Sandeep Sagar <
> sandeep.sagar@meltwater.com> wrote:
>
>> Hello,
>> Newbie here.Need help to figure out the issue here.
>> Doing a simple local spark Save using Iceberg with S3.
>> I see that my metadata folder was created in S3, so my schema/table
>> creation was successful.
>> When I try to run a Spark Write, I get a NullPointerException at
>>
>> java.lang.NullPointerException
>> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)
>> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:28)
>> at
>> org.apache.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:331)
>> at com.google.common.collect.Iterators$6.transform(Iterators.java:783)
>> at
>> com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>> at com.google.common.collect.Iterators.addAll(Iterators.java:356)
>> at com.google.common.collect.Lists.newArrayList(Lists.java:143)
>> at com.google.common.collect.Lists.newArrayList(Lists.java:130)
>> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:55)
>> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:28)
>> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:364)
>> at org.apache.iceberg.types.TypeUtil$VisitFuture.get(TypeUtil.java:316)
>> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:40)
>> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:28)
>> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:336)
>> at org.apache.iceberg.types.TypeUtil.reassignIds(TypeUtil.java:137)
>> at
>> org.apache.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:163)
>> at
>> org.apache.iceberg.spark.source.IcebergSource.validateWriteSchema(IcebergSource.java:146)
>> at
>> org.apache.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:76)
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:255)
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>
>> Maybe I am making a mistake in Schema Creation?
>>
>> new Schema(
>>         required(1, "mwId", Types.StringType.get()),
>>         required(2, "mwVersion", LongType.get()),
>>         required(3, "id", Types.LongType.get()),
>>         required(4, "id_str", Types.StringType.get()),
>>         optional(5, "text", Types.StringType.get()),
>>         optional(6, "created_at", Types.StringType.get()),
>>         optional(7, "lang", Types.StringType.get())
>> );
>>
>> PartitionSpec I used was PartitionSpec.unpartitioned();
>>
>> The write code I used was:
>>
>> Dataset<TweetItem> myDS;
>>
>> ...... (populate myDS)
>>
>> myDS.write()
>>         .format("iceberg")
>>         .mode("append")
>>         .save(getTableLocation());
>>
>>
>> If I do a PrintSchema, I get:
>>
>> root
>>  |-- columns: struct (nullable = true)
>>  |-- created_at: string (nullable = true)
>>  |-- encoder: struct (nullable = true)
>>  |-- id: long (nullable = true)
>>  |-- id_str: string (nullable = true)
>>  |-- lang: string (nullable = true)
>>  |-- mwId: string (nullable = true)
>>  |-- mwVersion: long (nullable = true)
>>  |-- partitionSpec: struct (nullable = true)
>>  |-- schema: struct (nullable = true)
>>  |    |-- aliases: map (nullable = true)
>>  |    |    |-- key: string
>>  |    |    |-- value: integer (valueContainsNull = true)
>>  |-- tableLocation: string (nullable = true)
>>  |-- text: string (nullable = true)
>>
>> Appreciate your help.
>>
>> regards
>> Sandeep
>>
>> The information contained in this email may be confidential. It has been
>> sent for the sole use of the intended recipient(s). If the reader of this
>> email is not an intended recipient, you are hereby notified that any
>> unauthorized review, use, disclosure, dissemination, distribution, or
>> copying of this message is strictly prohibited. If you have received this
>> email in error, please notify the sender immediately and destroy all copies
>> of the message.
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

-- 
The
 information contained in this email may be confidential. It has been 

sent for the sole use of the intended recipient(s). If the
reader of this 
email is not an intended recipient, you are hereby 
notified that any 
unauthorized review, use, disclosure, dissemination, 
distribution, or 
copying of this message is strictly prohibited. If you 
have received this 
email in error, please notify
the sender immediately and destroy all copies 
of the message.

Re: NPE in org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Hi Sandeep,

It looks like the problem is that your schema doesn’t match. There are
columns in your dataset that don’t appear in your table schema, like
tableLocation. When Iceberg tries to match up the dataset’s schema with the
table’s schema, it can’t find those fields by name and hits an error.

I think if you add a select, it should work:

myDS.select("mwId", "mwVersion", "id", "id_str", "text", "created_at",
"lang").write()
        .format("iceberg")
        .mode("append")
        .save(getTableLocation())

Iceberg has code to catch the schema mismatch and throw an exception, but
it looks like it runs after the point where this is failing. We should fix
Iceberg to correctly assign IDs so you get a better error message. I’ll
open an issue for this.

Another fix is also coming in the next Spark release. In 2.4, Spark doesn’t
validate the schema of a dataset when writing. That is fixed in master and
will be in the 3.0 release.

rb

On Thu, Aug 22, 2019 at 11:39 AM Sandeep Sagar <sa...@meltwater.com>
wrote:

> Hello,
> Newbie here.Need help to figure out the issue here.
> Doing a simple local spark Save using Iceberg with S3.
> I see that my metadata folder was created in S3, so my schema/table
> creation was successful.
> When I try to run a Spark Write, I get a NullPointerException at
>
> java.lang.NullPointerException
> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)
> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:28)
> at
> org.apache.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:331)
> at com.google.common.collect.Iterators$6.transform(Iterators.java:783)
> at
> com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
> at com.google.common.collect.Iterators.addAll(Iterators.java:356)
> at com.google.common.collect.Lists.newArrayList(Lists.java:143)
> at com.google.common.collect.Lists.newArrayList(Lists.java:130)
> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:55)
> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:28)
> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:364)
> at org.apache.iceberg.types.TypeUtil$VisitFuture.get(TypeUtil.java:316)
> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:40)
> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:28)
> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:336)
> at org.apache.iceberg.types.TypeUtil.reassignIds(TypeUtil.java:137)
> at
> org.apache.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:163)
> at
> org.apache.iceberg.spark.source.IcebergSource.validateWriteSchema(IcebergSource.java:146)
> at
> org.apache.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:76)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:255)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>
> Maybe I am making a mistake in Schema Creation?
>
> new Schema(
>         required(1, "mwId", Types.StringType.get()),
>         required(2, "mwVersion", LongType.get()),
>         required(3, "id", Types.LongType.get()),
>         required(4, "id_str", Types.StringType.get()),
>         optional(5, "text", Types.StringType.get()),
>         optional(6, "created_at", Types.StringType.get()),
>         optional(7, "lang", Types.StringType.get())
> );
>
> PartitionSpec I used was PartitionSpec.unpartitioned();
>
> The write code I used was:
>
> Dataset<TweetItem> myDS;
>
> ...... (populate myDS)
>
> myDS.write()
>         .format("iceberg")
>         .mode("append")
>         .save(getTableLocation());
>
>
> If I do a PrintSchema, I get:
>
> root
>  |-- columns: struct (nullable = true)
>  |-- created_at: string (nullable = true)
>  |-- encoder: struct (nullable = true)
>  |-- id: long (nullable = true)
>  |-- id_str: string (nullable = true)
>  |-- lang: string (nullable = true)
>  |-- mwId: string (nullable = true)
>  |-- mwVersion: long (nullable = true)
>  |-- partitionSpec: struct (nullable = true)
>  |-- schema: struct (nullable = true)
>  |    |-- aliases: map (nullable = true)
>  |    |    |-- key: string
>  |    |    |-- value: integer (valueContainsNull = true)
>  |-- tableLocation: string (nullable = true)
>  |-- text: string (nullable = true)
>
> Appreciate your help.
>
> regards
> Sandeep
>
> The information contained in this email may be confidential. It has been
> sent for the sole use of the intended recipient(s). If the reader of this
> email is not an intended recipient, you are hereby notified that any
> unauthorized review, use, disclosure, dissemination, distribution, or
> copying of this message is strictly prohibited. If you have received this
> email in error, please notify the sender immediately and destroy all copies
> of the message.



-- 
Ryan Blue
Software Engineer
Netflix