You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kai-Michael Roesner (Jira)" <ji...@apache.org> on 2022/10/28 10:25:00 UTC

[jira] [Updated] (SPARK-40952) Exception when handling timestamp data in PySpark Structured Streaming

     [ https://issues.apache.org/jira/browse/SPARK-40952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kai-Michael Roesner updated SPARK-40952:
----------------------------------------
    Description: 
I'm trying to process data that contains timestamps in PySpark "Structured Streaming" using the [foreach|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach] option. When I run the job I get a {{OSError: [Errno 22] Invalid argument}} exception in {{\pyspark\sql\types.py}} at the
{noformat}
return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)
{noformat}
statement.

I have boiled down my Spark job to the essentials:
{noformat}
from pyspark.sql import SparkSession

def handle_row(row):
  print(f'Processing: \{row}')

spark = (SparkSession.builder
  .appName('test.stream.tstmp.byrow')
  .getOrCreate())

data = (spark.readStream
  .option('delimiter', ',')
  .option('header', True)
  .schema('a integer, b string, c timestamp')
  .csv('data/test'))

query = (data.writeStream
  .foreach(handle_row)
  .start())

query.awaitTermination()
{noformat}
In the `data/test` folder I have one csv file:
{noformat}
a,b,c
1,x,1970-01-01 00:59:59.999
2,y,1999-12-31 23:59:59.999
3,z,2022-10-18 15:53:12.345
{noformat}
If I change the csv schema to `'a integer, b string, c string'` everything works fine and I get the expected output of
{noformat}
Processing: Row(a=1, b='x', c='1970-01-01 00:59:59.999')
Processing: Row(a=2, b='y', c='1999-12-31 23:59:59.999')
Processing: Row(a=3, b='z', c='2022-10-18 15:53:12.345')
{noformat}
Also, if I change the stream handling to [micro-batches|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch] like so:
{noformat}
...
def handle_batch(df, epoch_id):
  print(f'Processing: \{df} - Epoch: \{epoch_id}')
...
query = (data.writeStream
  .foreachBatch(handle_batch)
  .start())
{noformat}
I get the expected output of
{noformat}
Processing: DataFrame[a: int, b: string, c: timestamp] - Epoch: 0
{noformat}
But "by row" handling should work with the row having the correct column data type of `timestamp`.

  was:
I'm trying to process data that contains timestamps in PySpark "Structured Streaming" using the [foreach|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach] option. When I run the job I get a `OSError: [Errno 22] Invalid argument` exception in \pyspark\sql\types.py at the
{noformat}
return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)
{noformat}
statement.

I have boiled down my Spark job to the essentials:
{noformat}
from pyspark.sql import SparkSession

def handle_row(row):
  print(f'Processing: \{row}')

spark = (SparkSession.builder
  .appName('test.stream.tstmp.byrow')
  .getOrCreate())

data = (spark.readStream
  .option('delimiter', ',')
  .option('header', True)
  .schema('a integer, b string, c timestamp')
  .csv('data/test'))

query = (data.writeStream
  .foreach(handle_row)
  .start())

query.awaitTermination()
{noformat}

In the `data/test` folder I have one csv file:
{noformat}
a,b,c
1,x,1970-01-01 00:59:59.999
2,y,1999-12-31 23:59:59.999
3,z,2022-10-18 15:53:12.345
{noformat}

If I change the csv schema to `'a integer, b string, c string'` everything works fine and I get the expected output of
{noformat}
Processing: Row(a=1, b='x', c='1970-01-01 00:59:59.999')
Processing: Row(a=2, b='y', c='1999-12-31 23:59:59.999')
Processing: Row(a=3, b='z', c='2022-10-18 15:53:12.345')
{noformat}

Also, if I change the stream handling to [micro-batches|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch] like so:
{noformat}
...
def handle_batch(df, epoch_id):
  print(f'Processing: \{df} - Epoch: \{epoch_id}')
...
query = (data.writeStream
  .foreachBatch(handle_batch)
  .start())
{noformat}
I get the expected output of
{noformat}
Processing: DataFrame[a: int, b: string, c: timestamp] - Epoch: 0
{noformat}

But "by row" handling should work with the row having the correct column data type of `timestamp`.


> Exception when handling timestamp data in PySpark Structured Streaming
> ----------------------------------------------------------------------
>
>                 Key: SPARK-40952
>                 URL: https://issues.apache.org/jira/browse/SPARK-40952
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Structured Streaming, Windows
>    Affects Versions: 3.3.0
>         Environment: OS: Windows 10 
>            Reporter: Kai-Michael Roesner
>            Priority: Minor
>
> I'm trying to process data that contains timestamps in PySpark "Structured Streaming" using the [foreach|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach] option. When I run the job I get a {{OSError: [Errno 22] Invalid argument}} exception in {{\pyspark\sql\types.py}} at the
> {noformat}
> return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)
> {noformat}
> statement.
> I have boiled down my Spark job to the essentials:
> {noformat}
> from pyspark.sql import SparkSession
> def handle_row(row):
>   print(f'Processing: \{row}')
> spark = (SparkSession.builder
>   .appName('test.stream.tstmp.byrow')
>   .getOrCreate())
> data = (spark.readStream
>   .option('delimiter', ',')
>   .option('header', True)
>   .schema('a integer, b string, c timestamp')
>   .csv('data/test'))
> query = (data.writeStream
>   .foreach(handle_row)
>   .start())
> query.awaitTermination()
> {noformat}
> In the `data/test` folder I have one csv file:
> {noformat}
> a,b,c
> 1,x,1970-01-01 00:59:59.999
> 2,y,1999-12-31 23:59:59.999
> 3,z,2022-10-18 15:53:12.345
> {noformat}
> If I change the csv schema to `'a integer, b string, c string'` everything works fine and I get the expected output of
> {noformat}
> Processing: Row(a=1, b='x', c='1970-01-01 00:59:59.999')
> Processing: Row(a=2, b='y', c='1999-12-31 23:59:59.999')
> Processing: Row(a=3, b='z', c='2022-10-18 15:53:12.345')
> {noformat}
> Also, if I change the stream handling to [micro-batches|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch] like so:
> {noformat}
> ...
> def handle_batch(df, epoch_id):
>   print(f'Processing: \{df} - Epoch: \{epoch_id}')
> ...
> query = (data.writeStream
>   .foreachBatch(handle_batch)
>   .start())
> {noformat}
> I get the expected output of
> {noformat}
> Processing: DataFrame[a: int, b: string, c: timestamp] - Epoch: 0
> {noformat}
> But "by row" handling should work with the row having the correct column data type of `timestamp`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org