You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by RanXin <ra...@163.com> on 2019/06/23 09:14:54 UTC

Structured Streaming foreach function

I use spark 2.4.3, python to build a structured streaming. May I know the
data type of the parameter "row" in process_row function? The following
codes is how the official programming guide instruct us to deal with foreach
function:
def process_row(row):
          # Write row to storage
          pass
      
query = streamingDF.writeStream.foreach(process_row).start()

Thanks a lot.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Structured Streaming foreach function

Posted by Magnus Nilsson <ma...@kth.se>.
Row is a generic ordered collection of fields that most likely contain a
Schema of StructType. You need to keep track of the datatypes of the fields
yourself.

If you want compile time safety of datatypes (and intellisense support) you
need to use RDD:s or the Dataset[T] api. Dataset[T] might incur overhead
and break partition filtering pushdown etc. if you don't take care but it
will give you compile time errors. You still need to make sure the real
underlying data types conform to the schema when you cast the Dataframe
though. There's no Dataset api for Python though.

https://spark.apache.org/docs/2.4.2/api/java/org/apache/spark/sql/Row.html

Basically you need to check the schema of your input and treat you columns
accordingly.

DataType reference.
http://spark.apache.org/docs/latest/sql-reference.html


On Sun, Jun 23, 2019 at 11:15 AM RanXin <ra...@163.com> wrote:

> I use spark 2.4.3, python to build a structured streaming. May I know the
> data type of the parameter "row" in process_row function? The following
> codes is how the official programming guide instruct us to deal with
> foreach
> function:
> def process_row(row):
>           # Write row to storage
>           pass
>
> query = streamingDF.writeStream.foreach(process_row).start()
>
> Thanks a lot.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>