You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "KARTHIKEYAN RASIPALAYAM DURAIRAJ (JIRA)" <ji...@apache.org> on 2018/08/03 15:11:00 UTC
[jira] [Created] (SPARK-25014) When we tried to read kafka topic
through spark streaming spark submit is getting failed with Python worker
exited unexpectedly (crashed) error
KARTHIKEYAN RASIPALAYAM DURAIRAJ created SPARK-25014:
--------------------------------------------------------
Summary: When we tried to read kafka topic through spark streaming spark submit is getting failed with Python worker exited unexpectedly (crashed) error
Key: SPARK-25014
URL: https://issues.apache.org/jira/browse/SPARK-25014
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 2.3.1
Reporter: KARTHIKEYAN RASIPALAYAM DURAIRAJ
Fix For: 2.3.2
Hi Team ,
TOPIC = 'NBC_APPS.TBL_MS_ADVERTISER'
PARTITION = 0
topicAndPartition = TopicAndPartition(TOPIC, PARTITION)
fromOffsets1 = \{topicAndPartition:int(PARTITION)}
def handler(message):
records = message.collect()
for record in records:
value_all=record[1]
value_key=record[0]
# print(value_all)
schema_registry_client = CachedSchemaRegistryClient(url='http://localhost:8081')
serializer = MessageSerializer(schema_registry_client)
sc = SparkContext(appName="PythonStreamingAvro")
ssc = StreamingContext(sc, 10)
kvs = KafkaUtils.createDirectStream(ssc, ['NBC_APPS.TBL_MS_ADVERTISER'], \{"metadata.broker.list": 'localhost:9092'},valueDecoder=serializer.decode_message)
lines = kvs.map(lambda x: x[1])
lines.pprint()
kvs.foreachRDD(handler)
ssc.start()
ssc.awaitTermination()
This is code we trying to pull the data from kafka topic . when we execute through spark submit we are getting below error
2018-08-03 11:10:40 INFO VerifiableProperties:68 - Property zookeeper.connect is overridden to
2018-08-03 11:10:40 ERROR PythonRunner:91 - Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/KarthikeyanDurairaj/Desktop/Sparkshell/python/lib/pyspark.zip/pyspark/worker.py", line 215, in main
eval_type = read_int(infile)
File "/Users/KarthikeyanDurairaj/Desktop/Sparkshell/python/lib/pyspark.zip/pyspark/serializers.py", line 685, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org