You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by karan alang <ka...@gmail.com> on 2022/02/16 06:36:37 UTC

SparkStructured Streaming using withWatermark - TypeError: 'module' object is not callable

Hello All,

I have a Structured Streaming pyspark program running on GCP Dataproc,
which reads data from Kafka, and does some data massaging, and aggregation.
I'm trying to use withWatermark(), and it is giving error.

py4j.Py4JException: An exception was raised by the Python Proxy. Return
Message: Traceback (most recent call last):

  File
"/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
2442, in _call_proxy

    return_value = getattr(self.pool[obj_id], method)(*params)

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
196, in call

    raise e

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
193, in call

    self.func(DataFrame(jdf, self.sql_ctx), batch_id)

  File
"/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
line 444, in convertToDictForEachBatch

    ap = Alarm(tdict, spark)

  File
"/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
line 356, in __init__

    computeCount(l_alarm_df, l_alarm1_df)

  File
"/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
line 262, in computeCount

    window(col("timestamp"), "10 minutes").alias("window")

TypeError: 'module' object is not callable

Details are in stackoverflow below :
https://stackoverflow.com/questions/71137296/structuredstreaming-withwatermark-typeerror-module-object-is-not-callable

Any ideas on how to debug/fix this ?
tia !

Re: SparkStructured Streaming using withWatermark - TypeError: 'module' object is not callable

Posted by Mich Talebzadeh <mi...@gmail.com>.
OK, that sounds reasonable.

In the code below

 #Aggregation code in Alarm call, which uses withWatermark
     def computeCount(df_processedAlarm, df_totalAlarm):
          processedAlarmCnt = None
          if df_processedAlarm.count() > 0:
               processedAlarmCnt =
df_processedAlarm.withWatermark("timestamp", "10 seconds")\
               .groupBy(
                    window(col("timestamp"), "1 minutes").alias("window")
                ).count()


It is more efficient to use


         * if(len(df_processedAlarm.take(1)) > 0:*
               processedAlarmCnt =
df_processedAlarm.withWatermark("timestamp", "10 seconds")\
               .groupBy(
                    window(col("timestamp"), "1 minutes").alias("window")
                ).count()

          else:

              print("DataFrame is empty")

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 17 Feb 2022 at 06:33, karan alang <ka...@gmail.com> wrote:

> Hi Mich,
> the issue was related to incorrect, which is resolved.
>
> However, wrt your comment - 'OK sounds like your watermark is done
> outside of your processing.'
>
> In my use-case which primarily deals with syslogs, syslog is a string
> which needs to be parsed (with defensive coding built in to ensure records
> are in correct format), before it is fed to
> 3 different classes (AlarmProc being one of them) - where there is
> additional parsing + aggregation for specific types of logs.
> The way I'm handling this is by using -- foreachBatch(convertToDict) in
> the writeStream method, and the parsing + aggregation happens for the
> microbatch.
> foreachBatch - will wait for the parsing and aggregation to complete for
> the microbatch, and then proceed to do the same with the next microbatch.
>
> Since it involves a lot of parsing + aggregation, it requires more than a
> df.select() - hence the approach above is taken.
> From what I understand, the watermark is done within the processing ..
> since it is done per microbatch pulled with each trigger.
>
> Pls let me know if you have comments/suggestions on this approach.
>
> thanks,
> Karan Alang
>
>
> On Wed, Feb 16, 2022 at 12:52 AM Mich Talebzadeh <
> mich.talebzadeh@gmail.com> wrote:
>
>> OK sounds like your watermark is done outside of your processing.
>>
>> Check this
>>
>>             # construct a streaming dataframe streamingDataFrame that
>> subscribes to topic temperature
>>             streamingDataFrame = self.spark \
>>                 .readStream \
>>                 .format("kafka") \
>>                 .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>>                 .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>>                 .option("group.id", config['common']['appName']) \
>>                 .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>                 .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>>                 .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>                 .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>>                 .option("subscribe", "temperature") \
>>                 .option("failOnDataLoss", "false") \
>>                 .option("includeHeaders", "true") \
>>                 .option("startingOffsets", "latest") \
>>                 .load() \
>>                 .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))
>>
>>
>>             resultM = streamingDataFrame.select( \
>>                      col("parsed_value.rowkey").alias("rowkey") \
>>                    , col("parsed_value.timestamp").alias("timestamp") \
>>                    , col("parsed_value.temperature").alias("temperature"))
>>             result = resultM. \
>>                      withWatermark("timestamp", "5 minutes"). \
>>                      groupBy(window(resultM.timestamp, "5 minutes", "5
>> minutes")). \
>>                      avg('temperature'). \
>>                      writeStream. \
>>                      outputMode('complete'). \
>>                      option("numRows", 1000). \
>>                      option("truncate", "false"). \
>>                      format('console'). \
>>                      option('checkpointLocation', checkpoint_path). \
>>                      queryName("temperature"). \
>>                      start()
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 16 Feb 2022 at 06:37, karan alang <ka...@gmail.com> wrote:
>>
>>>
>>> Hello All,
>>>
>>> I have a Structured Streaming pyspark program running on GCP Dataproc,
>>> which reads data from Kafka, and does some data massaging, and aggregation.
>>> I'm trying to use withWatermark(), and it is giving error.
>>>
>>> py4j.Py4JException: An exception was raised by the Python Proxy. Return
>>> Message: Traceback (most recent call last):
>>>
>>>   File
>>> "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
>>> 2442, in _call_proxy
>>>
>>>     return_value = getattr(self.pool[obj_id], method)(*params)
>>>
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>>> line 196, in call
>>>
>>>     raise e
>>>
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>>> line 193, in call
>>>
>>>     self.func(DataFrame(jdf, self.sql_ctx), batch_id)
>>>
>>>   File
>>> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>>> line 444, in convertToDictForEachBatch
>>>
>>>     ap = Alarm(tdict, spark)
>>>
>>>   File
>>> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>>> line 356, in __init__
>>>
>>>     computeCount(l_alarm_df, l_alarm1_df)
>>>
>>>   File
>>> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>>> line 262, in computeCount
>>>
>>>     window(col("timestamp"), "10 minutes").alias("window")
>>>
>>> TypeError: 'module' object is not callable
>>>
>>> Details are in stackoverflow below :
>>>
>>> https://stackoverflow.com/questions/71137296/structuredstreaming-withwatermark-typeerror-module-object-is-not-callable
>>>
>>> Any ideas on how to debug/fix this ?
>>> tia !
>>>
>>

Re: SparkStructured Streaming using withWatermark - TypeError: 'module' object is not callable

Posted by karan alang <ka...@gmail.com>.
Hi Mich,
the issue was related to incorrect, which is resolved.

However, wrt your comment - 'OK sounds like your watermark is done outside
of your processing.'

In my use-case which primarily deals with syslogs, syslog is a string
which needs to be parsed (with defensive coding built in to ensure records
are in correct format), before it is fed to
3 different classes (AlarmProc being one of them) - where there is
additional parsing + aggregation for specific types of logs.
The way I'm handling this is by using -- foreachBatch(convertToDict) in the
writeStream method, and the parsing + aggregation happens for the
microbatch.
foreachBatch - will wait for the parsing and aggregation to complete for
the microbatch, and then proceed to do the same with the next microbatch.

Since it involves a lot of parsing + aggregation, it requires more than a
df.select() - hence the approach above is taken.
From what I understand, the watermark is done within the processing ..
since it is done per microbatch pulled with each trigger.

Pls let me know if you have comments/suggestions on this approach.

thanks,
Karan Alang


On Wed, Feb 16, 2022 at 12:52 AM Mich Talebzadeh <mi...@gmail.com>
wrote:

> OK sounds like your watermark is done outside of your processing.
>
> Check this
>
>             # construct a streaming dataframe streamingDataFrame that
> subscribes to topic temperature
>             streamingDataFrame = self.spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                 .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
>                 .option("group.id", config['common']['appName']) \
>                 .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>                 .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
>                 .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>                 .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
>                 .option("subscribe", "temperature") \
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "latest") \
>                 .load() \
>                 .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>
>             resultM = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.timestamp").alias("timestamp") \
>                    , col("parsed_value.temperature").alias("temperature"))
>             result = resultM. \
>                      withWatermark("timestamp", "5 minutes"). \
>                      groupBy(window(resultM.timestamp, "5 minutes", "5
> minutes")). \
>                      avg('temperature'). \
>                      writeStream. \
>                      outputMode('complete'). \
>                      option("numRows", 1000). \
>                      option("truncate", "false"). \
>                      format('console'). \
>                      option('checkpointLocation', checkpoint_path). \
>                      queryName("temperature"). \
>                      start()
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 16 Feb 2022 at 06:37, karan alang <ka...@gmail.com> wrote:
>
>>
>> Hello All,
>>
>> I have a Structured Streaming pyspark program running on GCP Dataproc,
>> which reads data from Kafka, and does some data massaging, and aggregation.
>> I'm trying to use withWatermark(), and it is giving error.
>>
>> py4j.Py4JException: An exception was raised by the Python Proxy. Return
>> Message: Traceback (most recent call last):
>>
>>   File
>> "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
>> 2442, in _call_proxy
>>
>>     return_value = getattr(self.pool[obj_id], method)(*params)
>>
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>> line 196, in call
>>
>>     raise e
>>
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>> line 193, in call
>>
>>     self.func(DataFrame(jdf, self.sql_ctx), batch_id)
>>
>>   File
>> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>> line 444, in convertToDictForEachBatch
>>
>>     ap = Alarm(tdict, spark)
>>
>>   File
>> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>> line 356, in __init__
>>
>>     computeCount(l_alarm_df, l_alarm1_df)
>>
>>   File
>> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>> line 262, in computeCount
>>
>>     window(col("timestamp"), "10 minutes").alias("window")
>>
>> TypeError: 'module' object is not callable
>>
>> Details are in stackoverflow below :
>>
>> https://stackoverflow.com/questions/71137296/structuredstreaming-withwatermark-typeerror-module-object-is-not-callable
>>
>> Any ideas on how to debug/fix this ?
>> tia !
>>
>

Re: SparkStructured Streaming using withWatermark - TypeError: 'module' object is not callable

Posted by Mich Talebzadeh <mi...@gmail.com>.
OK sounds like your watermark is done outside of your processing.

Check this

            # construct a streaming dataframe streamingDataFrame that
subscribes to topic temperature
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", "temperature") \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


            resultM = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.timestamp").alias("timestamp") \
                   , col("parsed_value.temperature").alias("temperature"))
            result = resultM. \
                     withWatermark("timestamp", "5 minutes"). \
                     groupBy(window(resultM.timestamp, "5 minutes", "5
minutes")). \
                     avg('temperature'). \
                     writeStream. \
                     outputMode('complete'). \
                     option("numRows", 1000). \
                     option("truncate", "false"). \
                     format('console'). \
                     option('checkpointLocation', checkpoint_path). \
                     queryName("temperature"). \
                     start()

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 16 Feb 2022 at 06:37, karan alang <ka...@gmail.com> wrote:

>
> Hello All,
>
> I have a Structured Streaming pyspark program running on GCP Dataproc,
> which reads data from Kafka, and does some data massaging, and aggregation.
> I'm trying to use withWatermark(), and it is giving error.
>
> py4j.Py4JException: An exception was raised by the Python Proxy. Return
> Message: Traceback (most recent call last):
>
>   File
> "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
> 2442, in _call_proxy
>
>     return_value = getattr(self.pool[obj_id], method)(*params)
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
> 196, in call
>
>     raise e
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
> 193, in call
>
>     self.func(DataFrame(jdf, self.sql_ctx), batch_id)
>
>   File
> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
> line 444, in convertToDictForEachBatch
>
>     ap = Alarm(tdict, spark)
>
>   File
> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
> line 356, in __init__
>
>     computeCount(l_alarm_df, l_alarm1_df)
>
>   File
> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
> line 262, in computeCount
>
>     window(col("timestamp"), "10 minutes").alias("window")
>
> TypeError: 'module' object is not callable
>
> Details are in stackoverflow below :
>
> https://stackoverflow.com/questions/71137296/structuredstreaming-withwatermark-typeerror-module-object-is-not-callable
>
> Any ideas on how to debug/fix this ?
> tia !
>