You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jeffrey(Xilang) Yan (JIRA)" <ji...@apache.org> on 2019/05/31 06:29:00 UTC
[jira] [Created] (SPARK-27894) PySpark streaming transform RDD join
not works when checkpoint enabled
Jeffrey(Xilang) Yan created SPARK-27894:
-------------------------------------------
Summary: PySpark streaming transform RDD join not works when checkpoint enabled
Key: SPARK-27894
URL: https://issues.apache.org/jira/browse/SPARK-27894
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 2.4.0
Reporter: Jeffrey(Xilang) Yan
In PySpark Steaming, if checkpoint enabled and there is a transform-join operation, the error thrown.
{{sc=SparkContext(appName='xxxx') }}
{{sc.setLogLevel("WARN") }}
{{ssc=StreamingContext(sc,10) }}
{{ssc.checkpoint("hdfs://xxxx/test") }}
{{kafka_bootstrap_servers="xxxx" }}
{{topics = ['xxxx', 'xxxx'] }}
{{doc_info = sc.parallelize(((1, 2), (4, 5), (7, 8), (10, 11)))}}
{{ kvds=KafkaUtils.createDirectStream(ssc, topics, kafkaParams=\{"metadata.broker.list": kafka_bootstrap_servers}) }}
{{line=kvds.map(lambda x:(1,2)) }}
{{line.transform(lambda rdd:rdd.join(doc_info)).pprint(10) }}
{{ssc.start() }}
{{ssc.awaitTermination() }}
Error details:
{{PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. }}
The similar code works great in Scala. And if we remove any of
{{ssc.checkpoint("hdfs://xxxx/test") }}
or
{{line.transform(lambda rdd:rdd.join(doc_info)) }}
There is no error either.
It seems that when checkpoint is enabled, pyspark will serialize transform lambda, and then the RDD used by lambda, while RDD cannot be serialize so the error prompted.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org