You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nick Dawes <ni...@gmail.com> on 2020/01/17 23:27:22 UTC

Extract value from streaming Dataframe to a variable

I need to extract a value from a PySpark structured streaming Dataframe to
a string variable to check something.

I tried this code.

agentName =
kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]

This works on a non-streaming Dataframe only. In a streaming Dataframe,
collect is not supported.

Any workaround for this?

Nick

Re: Extract value from streaming Dataframe to a variable

Posted by Jungtaek Lim <ka...@gmail.com>.
`foreachBatch` is being added in Spark 2.4.x if I understand correctly, so
in any language you'll want to upgrade Spark to 2.4.x to use
`foreachBatch`. PySpark is addressed as well.

https://issues.apache.org/jira/browse/SPARK-24565


On Wed, Jan 22, 2020 at 1:12 AM Nick Dawes <ni...@gmail.com> wrote:

> Thanks for your reply.
>
> I'm using Spark 2.3.2. Looks like foreach operation is only supported for
> Java and Scala. Is there any alternative for Python?
>
> On Mon, Jan 20, 2020, 5:09 PM Jungtaek Lim <ka...@gmail.com>
> wrote:
>
>> Hi,
>>
>> you can try out foreachBatch to apply the batch query operation to the
>> each output of micro-batch:
>>
>> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>>
>> Hope this helps.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>>
>> On Mon, Jan 20, 2020 at 8:43 PM Nick Dawes <ni...@gmail.com> wrote:
>>
>>> Streaming experts, any clues how to achieve this?
>>>
>>> After extracting few variables, I need to run them through a REST API
>>> for verification and decision making.
>>>
>>> Thanks for your help.
>>>
>>> Nick
>>>
>>> On Fri, Jan 17, 2020, 6:27 PM Nick Dawes <ni...@gmail.com> wrote:
>>>
>>>> I need to extract a value from a PySpark structured streaming Dataframe
>>>> to a string variable to check something.
>>>>
>>>> I tried this code.
>>>>
>>>> agentName =
>>>> kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]
>>>>
>>>> This works on a non-streaming Dataframe only. In a streaming Dataframe,
>>>> collect is not supported.
>>>>
>>>> Any workaround for this?
>>>>
>>>> Nick
>>>>
>>>>
>>>>

Re: Extract value from streaming Dataframe to a variable

Posted by Nick Dawes <ni...@gmail.com>.
Thanks for your reply.

I'm using Spark 2.3.2. Looks like foreach operation is only supported for
Java and Scala. Is there any alternative for Python?

On Mon, Jan 20, 2020, 5:09 PM Jungtaek Lim <ka...@gmail.com>
wrote:

> Hi,
>
> you can try out foreachBatch to apply the batch query operation to the
> each output of micro-batch:
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Mon, Jan 20, 2020 at 8:43 PM Nick Dawes <ni...@gmail.com> wrote:
>
>> Streaming experts, any clues how to achieve this?
>>
>> After extracting few variables, I need to run them through a REST API for
>> verification and decision making.
>>
>> Thanks for your help.
>>
>> Nick
>>
>> On Fri, Jan 17, 2020, 6:27 PM Nick Dawes <ni...@gmail.com> wrote:
>>
>>> I need to extract a value from a PySpark structured streaming Dataframe
>>> to a string variable to check something.
>>>
>>> I tried this code.
>>>
>>> agentName =
>>> kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]
>>>
>>> This works on a non-streaming Dataframe only. In a streaming Dataframe,
>>> collect is not supported.
>>>
>>> Any workaround for this?
>>>
>>> Nick
>>>
>>>
>>>

Re: Extract value from streaming Dataframe to a variable

Posted by Jungtaek Lim <ka...@gmail.com>.
Hi,

you can try out foreachBatch to apply the batch query operation to the each
output of micro-batch:
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Jan 20, 2020 at 8:43 PM Nick Dawes <ni...@gmail.com> wrote:

> Streaming experts, any clues how to achieve this?
>
> After extracting few variables, I need to run them through a REST API for
> verification and decision making.
>
> Thanks for your help.
>
> Nick
>
> On Fri, Jan 17, 2020, 6:27 PM Nick Dawes <ni...@gmail.com> wrote:
>
>> I need to extract a value from a PySpark structured streaming Dataframe
>> to a string variable to check something.
>>
>> I tried this code.
>>
>> agentName =
>> kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]
>>
>> This works on a non-streaming Dataframe only. In a streaming Dataframe,
>> collect is not supported.
>>
>> Any workaround for this?
>>
>> Nick
>>
>>
>>

Re: Extract value from streaming Dataframe to a variable

Posted by Nick Dawes <ni...@gmail.com>.
Streaming experts, any clues how to achieve this?

After extracting few variables, I need to run them through a REST API for
verification and decision making.

Thanks for your help.

Nick

On Fri, Jan 17, 2020, 6:27 PM Nick Dawes <ni...@gmail.com> wrote:

> I need to extract a value from a PySpark structured streaming Dataframe to
> a string variable to check something.
>
> I tried this code.
>
> agentName =
> kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]
>
> This works on a non-streaming Dataframe only. In a streaming Dataframe,
> collect is not supported.
>
> Any workaround for this?
>
> Nick
>
>
>