You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kuttaiah Robin <ku...@gmail.com> on 2018/10/05 13:22:44 UTC

Recreate Dataset from list of Row in spark streaming application.

Hello,

I have a spark streaming application which reads from Kafka based on the
given schema.

Dataset<Row>  m_oKafkaEvents =
getSparkSession().readStream().format("kafka")
            .option("kafka.bootstrap.servers", strKafkaAddress)
            .option("assign", strSubscription)
            .option("maxOffsetsPerTrigger", "100000")
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", false)
            .load()
            .filter(strFilter)

.select(functions.from_json(functions.col("value").cast("string"),
schema).alias("events"))
            .select("events.*");


Now this dataset is grouped by one of the column(InstanceId) which is the
key for us and then fed into flatMapGroupsWithState function. This function
does some correlation.

Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
  new MapFunction<Row, String>() {
    @Override public String call(Row event) {
    return event.getAs("InstanceId");
    }
  }, Encoders.STRING())
  .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class),
Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


The output dataset is of type InsightEventUpdate which contains List of
Spark Rows which is related to the InstanceId.

Now I want to convert this back into of type Dataset<Row>. Basically I have
List of rows.

I tried

sparkSession.createDataFrame(listOfRows, schema);

this gives me

ava.lang.NullPointerException
        at
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
        at
org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
        at
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
        at
oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)

Can someone help me what is the way to go ahead?

thanks
Robin Kuttaiah

Re: Recreate Dataset from list of Row in spark streaming application.

Posted by Kuttaiah Robin <ku...@gmail.com>.
Thanks Ryan.

Yes. That is ForeachWriter.

Can someone help me on how to solve this?.

Basically the output of  flatMapGroupsWithState   function is
Dataset<InsightEventUpdate> sessionUpdates ;

InsightEventUpdate  class contains list of Spark Row which I need to
convert back to Dataset<Row>.

Something like

Dataset<Row>  correlatedEvents =  <do something here with sessionUpdates>;.

Please note this is a streaming application.

regards,
Robin.




On Fri, Oct 5, 2018 at 11:12 PM Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:

> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor
> is a ForeachWriter. Right? You can not use SparkSession in its process
> method as it will run in executors.
>
> Best Regards,
> Ryan
>
>
> On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin <ku...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a spark streaming application which reads from Kafka based on the
>> given schema.
>>
>> Dataset<Row>  m_oKafkaEvents =
>> getSparkSession().readStream().format("kafka")
>>             .option("kafka.bootstrap.servers", strKafkaAddress)
>>             .option("assign", strSubscription)
>>             .option("maxOffsetsPerTrigger", "100000")
>>             .option("startingOffsets", "latest")
>>             .option("failOnDataLoss", false)
>>             .load()
>>             .filter(strFilter)
>>
>> .select(functions.from_json(functions.col("value").cast("string"),
>> schema).alias("events"))
>>             .select("events.*");
>>
>>
>> Now this dataset is grouped by one of the column(InstanceId) which is the
>> key for us and then fed into flatMapGroupsWithState function. This function
>> does some correlation.
>>
>> Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
>>   new MapFunction<Row, String>() {
>>     @Override public String call(Row event) {
>>     return event.getAs("InstanceId");
>>     }
>>   }, Encoders.STRING())
>>   .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
>>   Encoders.bean(InsightEventInfo.class),
>> Encoders.bean(InsightEventUpdate.class),
>>   GroupStateTimeout.ProcessingTimeTimeout());
>>
>>
>> The output dataset is of type InsightEventUpdate which contains List of
>> Spark Rows which is related to the InstanceId.
>>
>> Now I want to convert this back into of type Dataset<Row>. Basically I
>> have List of rows.
>>
>> I tried
>>
>> sparkSession.createDataFrame(listOfRows, schema);
>>
>> this gives me
>>
>> ava.lang.NullPointerException
>>         at
>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
>>         at
>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
>>         at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
>>         at
>> org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
>>         at
>> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)
>>
>> Can someone help me what is the way to go ahead?
>>
>> thanks
>> Robin Kuttaiah
>>
>>
>>
>>
>>

Re: Recreate Dataset from list of Row in spark streaming application.

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor is
a ForeachWriter. Right? You can not use SparkSession in its process method
as it will run in executors.

Best Regards,
Ryan


On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin <ku...@gmail.com> wrote:

> Hello,
>
> I have a spark streaming application which reads from Kafka based on the
> given schema.
>
> Dataset<Row>  m_oKafkaEvents =
> getSparkSession().readStream().format("kafka")
>             .option("kafka.bootstrap.servers", strKafkaAddress)
>             .option("assign", strSubscription)
>             .option("maxOffsetsPerTrigger", "100000")
>             .option("startingOffsets", "latest")
>             .option("failOnDataLoss", false)
>             .load()
>             .filter(strFilter)
>
> .select(functions.from_json(functions.col("value").cast("string"),
> schema).alias("events"))
>             .select("events.*");
>
>
> Now this dataset is grouped by one of the column(InstanceId) which is the
> key for us and then fed into flatMapGroupsWithState function. This function
> does some correlation.
>
> Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
>   new MapFunction<Row, String>() {
>     @Override public String call(Row event) {
>     return event.getAs("InstanceId");
>     }
>   }, Encoders.STRING())
>   .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
>   Encoders.bean(InsightEventInfo.class),
> Encoders.bean(InsightEventUpdate.class),
>   GroupStateTimeout.ProcessingTimeTimeout());
>
>
> The output dataset is of type InsightEventUpdate which contains List of
> Spark Rows which is related to the InstanceId.
>
> Now I want to convert this back into of type Dataset<Row>. Basically I
> have List of rows.
>
> I tried
>
> sparkSession.createDataFrame(listOfRows, schema);
>
> this gives me
>
> ava.lang.NullPointerException
>         at
> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
>         at
> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
>         at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
>         at
> org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
>         at
> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)
>
> Can someone help me what is the way to go ahead?
>
> thanks
> Robin Kuttaiah
>
>
>
>
>