You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jeffrey(Xilang) Yan (JIRA)" <ji...@apache.org> on 2019/05/31 06:29:00 UTC

[jira] [Created] (SPARK-27894) PySpark streaming transform RDD join not works when checkpoint enabled

Jeffrey(Xilang) Yan created SPARK-27894:
-------------------------------------------

             Summary: PySpark streaming transform RDD join not works when checkpoint enabled
                 Key: SPARK-27894
                 URL: https://issues.apache.org/jira/browse/SPARK-27894
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.4.0
            Reporter: Jeffrey(Xilang) Yan


In PySpark Steaming, if checkpoint enabled and there is a transform-join operation, the error thrown.

{{sc=SparkContext(appName='xxxx') }}

{{sc.setLogLevel("WARN") }}

{{ssc=StreamingContext(sc,10) }}

{{ssc.checkpoint("hdfs://xxxx/test") }}

{{kafka_bootstrap_servers="xxxx" }}

{{topics = ['xxxx', 'xxxx'] }}

{{doc_info = sc.parallelize(((1, 2), (4, 5), (7, 8), (10, 11)))}}

{{ kvds=KafkaUtils.createDirectStream(ssc, topics, kafkaParams=\{"metadata.broker.list": kafka_bootstrap_servers}) }}

{{line=kvds.map(lambda x:(1,2)) }}

{{line.transform(lambda rdd:rdd.join(doc_info)).pprint(10) }}

{{ssc.start() }}

{{ssc.awaitTermination() }}

Error details:

{{PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. }}

The similar code works great in Scala. And if we remove any of

{{ssc.checkpoint("hdfs://xxxx/test") }}

or

{{line.transform(lambda rdd:rdd.join(doc_info)) }}

There is no error either.

 

It seems that when checkpoint is enabled, pyspark will serialize transform lambda, and then the RDD used by lambda, while RDD cannot be serialize so the error prompted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org