You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2018/08/21 17:38:00 UTC

[jira] [Commented] (SPARK-24335) Dataset.map schema not applied in some cases

    [ https://issues.apache.org/jira/browse/SPARK-24335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587785#comment-16587785 ] 

Apache Spark commented on SPARK-24335:
--------------------------------------

User 'redsanket' has created a pull request for this issue:
https://github.com/apache/spark/pull/22173

> Dataset.map schema not applied in some cases
> --------------------------------------------
>
>                 Key: SPARK-24335
>                 URL: https://issues.apache.org/jira/browse/SPARK-24335
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0, 2.3.0
>            Reporter: Robert Reid
>            Priority: Major
>
> In the following code an {color:#808080}UnsupportedOperationException{color} is thrown in the filter() call just after the Dateset.map() call unless withWatermark() is added between them. The error reports `{color:#808080}fieldIndex on a Row without schema is undefined{color}`.  I expect the map() method to have applied the schema and for it to be accessible in filter().  Without the extra withWatermark() call my debugger reports that the `row` objects in the filter lambda are `GenericRow`.  With the watermark call it reports that they are `GenericRowWithSchema`.
> I should add that I'm new to working with Structured Streaming.  So if I'm overlooking some implied dependency please fill me in.
> I'm encountering this in new code for a new production job. The presented code is distilled down to demonstrate the problem.  While the problem can be worked around simply by adding withWatermark() I'm concerned that this will leave the code in a fragile state.  With this simplified code if this error occurs again it will be easy to identify what change led to the error.  But in the code I'm writing, with this functionality delegated to other classes, it is (and has been) very challenging to identify the cause.
>  
> {code:java}
> public static void main(String[] args) {
>     SparkSession sparkSession = SparkSession.builder().master("local").getOrCreate();
>     sparkSession.conf().set(
>             "spark.sql.streaming.checkpointLocation",
>             "hdfs://localhost:9000/search_relevance/checkpoint" // for spark 2.3
>             // "spark.sql.streaming.checkpointLocation", "tmp/checkpoint" // for spark 2.1
>     );
>     StructType inSchema = DataTypes.createStructType(
>             new StructField[] {
>                     DataTypes.createStructField("id", DataTypes.StringType      , false),
>                     DataTypes.createStructField("ts", DataTypes.TimestampType   , false),
>                     DataTypes.createStructField("f1", DataTypes.LongType        , true)
>             }
>     );
>     Dataset<Row> rawSet = sparkSession.sqlContext().readStream()
>             .format("rate")
>             .option("rowsPerSecond", 1)
>             .load()
>             .map(   (MapFunction<Row, Row>) raw -> {
>                         Object[] fields = new Object[3];
>                         fields[0] = "id1";
>                         fields[1] = raw.getAs("timestamp");
>                         fields[2] = raw.getAs("value");
>                         return RowFactory.create(fields);
>                     },
>                     RowEncoder.apply(inSchema)
>             )
>             // If withWatermark() is included above the filter() line then this works.  Without it we get:
>             //    Caused by: java.lang.UnsupportedOperationException: fieldIndex on a Row without schema is undefined.
>             // at the row.getAs() call.
>             // .withWatermark("ts", "10 seconds")  // <-- This is required for row.getAs("f1") to work ???
>             .filter((FilterFunction<Row>) row -> !row.getAs("f1").equals(0L))
>             .withWatermark("ts", "10 seconds")
>             ;
>     StreamingQuery streamingQuery = rawSet
>             .select("*")
>             .writeStream()
>             .format("console")
>             .outputMode("append")
>             .start();
>     try {
>         streamingQuery.awaitTermination(30_000);
>     } catch (StreamingQueryException e) {
>         System.out.println("Caught exception at 'awaitTermination':");
>         e.printStackTrace();
>     }
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org