You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kuttaiah Robin <ku...@gmail.com> on 2018/10/05 13:22:44 UTC
Recreate Dataset from list of Row in spark streaming application.
Hello,
I have a spark streaming application which reads from Kafka based on the
given schema.
Dataset<Row> m_oKafkaEvents =
getSparkSession().readStream().format("kafka")
.option("kafka.bootstrap.servers", strKafkaAddress)
.option("assign", strSubscription)
.option("maxOffsetsPerTrigger", "100000")
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.load()
.filter(strFilter)
.select(functions.from_json(functions.col("value").cast("string"),
schema).alias("events"))
.select("events.*");
Now this dataset is grouped by one of the column(InstanceId) which is the
key for us and then fed into flatMapGroupsWithState function. This function
does some correlation.
Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
new MapFunction<Row, String>() {
@Override public String call(Row event) {
return event.getAs("InstanceId");
}
}, Encoders.STRING())
.flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
Encoders.bean(InsightEventInfo.class),
Encoders.bean(InsightEventUpdate.class),
GroupStateTimeout.ProcessingTimeTimeout());
The output dataset is of type InsightEventUpdate which contains List of
Spark Rows which is related to the InstanceId.
Now I want to convert this back into of type Dataset<Row>. Basically I have
List of rows.
I tried
sparkSession.createDataFrame(listOfRows, schema);
this gives me
ava.lang.NullPointerException
at
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
at
org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
at
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
at
oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)
Can someone help me what is the way to go ahead?
thanks
Robin Kuttaiah
Re: Recreate Dataset from list of Row in spark streaming application.
Posted by Kuttaiah Robin <ku...@gmail.com>.
Thanks Ryan.
Yes. That is ForeachWriter.
Can someone help me on how to solve this?.
Basically the output of flatMapGroupsWithState function is
Dataset<InsightEventUpdate> sessionUpdates ;
InsightEventUpdate class contains list of Spark Row which I need to
convert back to Dataset<Row>.
Something like
Dataset<Row> correlatedEvents = <do something here with sessionUpdates>;.
Please note this is a streaming application.
regards,
Robin.
On Fri, Oct 5, 2018 at 11:12 PM Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:
> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor
> is a ForeachWriter. Right? You can not use SparkSession in its process
> method as it will run in executors.
>
> Best Regards,
> Ryan
>
>
> On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin <ku...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a spark streaming application which reads from Kafka based on the
>> given schema.
>>
>> Dataset<Row> m_oKafkaEvents =
>> getSparkSession().readStream().format("kafka")
>> .option("kafka.bootstrap.servers", strKafkaAddress)
>> .option("assign", strSubscription)
>> .option("maxOffsetsPerTrigger", "100000")
>> .option("startingOffsets", "latest")
>> .option("failOnDataLoss", false)
>> .load()
>> .filter(strFilter)
>>
>> .select(functions.from_json(functions.col("value").cast("string"),
>> schema).alias("events"))
>> .select("events.*");
>>
>>
>> Now this dataset is grouped by one of the column(InstanceId) which is the
>> key for us and then fed into flatMapGroupsWithState function. This function
>> does some correlation.
>>
>> Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
>> new MapFunction<Row, String>() {
>> @Override public String call(Row event) {
>> return event.getAs("InstanceId");
>> }
>> }, Encoders.STRING())
>> .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
>> Encoders.bean(InsightEventInfo.class),
>> Encoders.bean(InsightEventUpdate.class),
>> GroupStateTimeout.ProcessingTimeTimeout());
>>
>>
>> The output dataset is of type InsightEventUpdate which contains List of
>> Spark Rows which is related to the InstanceId.
>>
>> Now I want to convert this back into of type Dataset<Row>. Basically I
>> have List of rows.
>>
>> I tried
>>
>> sparkSession.createDataFrame(listOfRows, schema);
>>
>> this gives me
>>
>> ava.lang.NullPointerException
>> at
>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
>> at
>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
>> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
>> at
>> org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
>> at
>> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)
>>
>> Can someone help me what is the way to go ahead?
>>
>> thanks
>> Robin Kuttaiah
>>
>>
>>
>>
>>
Re: Recreate Dataset from list of Row in spark streaming application.
Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor is
a ForeachWriter. Right? You can not use SparkSession in its process method
as it will run in executors.
Best Regards,
Ryan
On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin <ku...@gmail.com> wrote:
> Hello,
>
> I have a spark streaming application which reads from Kafka based on the
> given schema.
>
> Dataset<Row> m_oKafkaEvents =
> getSparkSession().readStream().format("kafka")
> .option("kafka.bootstrap.servers", strKafkaAddress)
> .option("assign", strSubscription)
> .option("maxOffsetsPerTrigger", "100000")
> .option("startingOffsets", "latest")
> .option("failOnDataLoss", false)
> .load()
> .filter(strFilter)
>
> .select(functions.from_json(functions.col("value").cast("string"),
> schema).alias("events"))
> .select("events.*");
>
>
> Now this dataset is grouped by one of the column(InstanceId) which is the
> key for us and then fed into flatMapGroupsWithState function. This function
> does some correlation.
>
> Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
> new MapFunction<Row, String>() {
> @Override public String call(Row event) {
> return event.getAs("InstanceId");
> }
> }, Encoders.STRING())
> .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
> Encoders.bean(InsightEventInfo.class),
> Encoders.bean(InsightEventUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
>
> The output dataset is of type InsightEventUpdate which contains List of
> Spark Rows which is related to the InstanceId.
>
> Now I want to convert this back into of type Dataset<Row>. Basically I
> have List of rows.
>
> I tried
>
> sparkSession.createDataFrame(listOfRows, schema);
>
> this gives me
>
> ava.lang.NullPointerException
> at
> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
> at
> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
> at
> org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
> at
> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)
>
> Can someone help me what is the way to go ahead?
>
> thanks
> Robin Kuttaiah
>
>
>
>
>