You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Aditya (JIRA)" <ji...@apache.org> on 2019/07/10 12:54:00 UTC

[jira] [Created] (SPARK-28336) Tried running same code in local machine in IDE pycharm it running fine but issue arises when i setup all on EC2 my RDD has Json Value and convert it to data frame and show dataframe by Show method it fails to show my data frame.

Aditya created SPARK-28336:
------------------------------

             Summary: Tried running same code in local machine in IDE pycharm it running fine but issue arises when i setup all on EC2 my RDD has Json Value and convert it to data frame and show dataframe by Show method it fails to show my data frame.
                 Key: SPARK-28336
                 URL: https://issues.apache.org/jira/browse/SPARK-28336
             Project: Spark
          Issue Type: Bug
          Components: Deploy, DStreams, EC2, PySpark, Spark Submit
    Affects Versions: 2.4.3
         Environment: Using EC2 Ubuntu 18.04.2 LTS

Spark version : Spark 2.4.3 built for Hadoop 2.7.3

Kafka version : kafka_2.12-2.2.1
            Reporter: Aditya


I am a beginner to pyspark and I am creating a pilot project in spark i used pycharm IDE for developing my project and it runs fine on my IDE Let me explain my project I am producing JSON in Kafka topic and consuming topic in spark and converting RDD VALUE(which is i JSON) converting to data frame using this method (productInfo = sqlContext.read.json(rdd)) and working perfectly on my local machine after converting RDD to DataFrame I am displaying that DataFrame to my console using .Show() method and working fine.

But my problem arises when I setup all this(Kafka,apache-spark) in EC2(Ubuntu 18.04.2 LTS) and tried to execute using spark-submit console stop when it reached my show() method and display nothing again starts and stops at show() method I can't figure out what is error not showing any error in console and also check if my data is coming in RDD or not it is in RDD

{color:#FF0000}My Code:{color}

{color:#FF0000}# -*- coding: utf-8 -*-{color}
{color:#FF0000}from pyspark import SparkContext{color}
{color:#FF0000}from pyspark import SparkConf{color}
{color:#FF0000}from pyspark.streaming import StreamingContext{color}
{color:#FF0000}from pyspark.streaming.kafka import KafkaUtils{color}
{color:#FF0000}from pyspark.sql import Row, DataFrame, SQLContext{color}
{color:#FF0000}import pandas as pd{color}

{color:#FF0000}def getSqlContextInstance(sparkContext):{color}
{color:#FF0000} if ('sqlContextSingletonInstance' not in globals()):{color}
{color:#FF0000} globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext){color}
{color:#FF0000} return globals()['sqlContextSingletonInstance']{color}

{color:#FF0000}def process(time, rdd):{color}
{color:#FF0000} print("========= %s =========" % str(time)){color}
{color:#FF0000} try:{color}

#print("--------------Also cross check my data is present in rdd I checked by printing ----------------")
#results = rdd.collect()
#for result in results:
 #print(result)


{color:#FF0000} # Get the singleton instance of SparkSession{color}
{color:#FF0000} sqlContext = getSqlContextInstance(rdd.context){color}
{color:#FF0000} productInfo = sqlContext.read.json(rdd){color}

//problem comes here when i try to show it
{color:#FF0000} productInfo.show(){color}
{color:#FF0000} except:{color}
{color:#FF0000} pass{color}

{color:#FF0000}if __name__ == '__main__':{color}
{color:#FF0000} conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1"){color}
{color:#FF0000} sc = SparkContext(conf = conf){color}
{color:#FF0000} sc.setLogLevel("WARN"){color}
{color:#FF0000} sqlContext = SQLContext(sc){color}
{color:#FF0000} ssc = StreamingContext(sc,10){color}
{color:#FF0000} kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', \{'new_topic':1}){color}
{color:#FF0000} lines = kafkaStream.map(lambda x: x[1]){color}
{color:#FF0000} lines.foreachRDD(process){color}
{color:#FF0000} #lines.pprint(){color}
{color:#FF0000} ssc.start(){color}
{color:#FF0000} ssc.awaitTermination(){color}

{color:#FF0000}My console:{color}

./spark-submit ReadingJsonFromKafkaAndWritingToScylla_CSV_Example.py
19/07/10 11:13:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/07/10 11:13:15 INFO SparkContext: Running Spark version 2.4.3
19/07/10 11:13:15 INFO SparkContext: Submitted application: ReadingJsonFromKafkaAndWritingToScylla_CSV_Example.py
19/07/10 11:13:15 INFO SecurityManager: Changing view acls to: kafka
19/07/10 11:13:15 INFO SecurityManager: Changing modify acls to: kafka
19/07/10 11:13:15 INFO SecurityManager: Changing view acls groups to: 
19/07/10 11:13:15 INFO SecurityManager: Changing modify acls groups to: 
19/07/10 11:13:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(kafka); groups with view permissions: Set(); users with modify permissions: Set(kafka); groups with modify permissions: Set()
19/07/10 11:13:16 INFO Utils: Successfully started service 'sparkDriver' on port 41655.
19/07/10 11:13:16 INFO SparkEnv: Registering MapOutputTracker
19/07/10 11:13:16 INFO SparkEnv: Registering BlockManagerMaster
19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/07/10 11:13:16 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-33f848fe-88d7-4c8f-8440-8384e094c59c
19/07/10 11:13:16 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
19/07/10 11:13:16 INFO SparkEnv: Registering OutputCommitCoordinator
19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
19/07/10 11:13:16 INFO Utils: Successfully started service 'SparkUI' on port 4046.
19/07/10 11:13:16 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://ip-172-31-92-134.ec2.internal:4046
19/07/10 11:13:16 INFO Executor: Starting executor ID driver on host localhost
19/07/10 11:13:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34719.
19/07/10 11:13:16 INFO NettyBlockTransferService: Server created on ip-172-31-92-134.ec2.internal:34719
19/07/10 11:13:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/07/10 11:13:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None)
19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: Registering block manager ip-172-31-92-134.ec2.internal:34719 with 366.3 MB RAM, BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None)
19/07/10 11:13:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None)
19/07/10 11:13:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None)
19/07/10 11:13:17 WARN AppInfo$: Can't read Kafka version from MANIFEST.MF. Possible cause: java.lang.NullPointerException
19/07/10 11:13:18 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:13:18 WARN BlockManager: Block input-0-1562757198000 replicated to only 0 peer(s) instead of 1 peers


{color:#FF0000}///////////////////This is when I am not producing in data in my kafka topic//////////////////////{color}

========= 2019-07-10 11:13:20 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
========= 2019-07-10 11:13:30 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
++
||
++
++

------------------------after printing-----------------------
========= 2019-07-10 11:13:40 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
++
||
++
++

------------------------after printing-----------------------
========= 2019-07-10 11:15:40 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
++
||
++
++

------------------------after printing-----------------------
19/07/10 11:15:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:15:47 WARN BlockManager: Block input-0-1562757347200 replicated to only 0 peer(s) instead of 1 peers

{color:#FF0000}///////////////////This is when I start producing my data in kafka topic//////////////////////{color}


========= 2019-07-10 11:15:50 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
19/07/10 11:15:52 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:15:52 WARN BlockManager: Block input-0-1562757352200 replicated to only 0 peer(s) instead of 1 peers
19/07/10 11:15:57 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:15:57 WARN BlockManager: Block input-0-1562757357200 replicated to only 0 peer(s) instead of 1 peers
========= 2019-07-10 11:16:00 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
19/07/10 11:16:02 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:16:02 WARN BlockManager: Block input-0-1562757362200 replicated to only 0 peer(s) instead of 1 peers
19/07/10 11:16:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:16:07 WARN BlockManager: Block input-0-1562757367400 replicated to only 0 peer(s) instead of 1 peers
========= 2019-07-10 11:16:10 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
19/07/10 11:16:12 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:16:12 WARN BlockManager: Block input-0-1562757372400 replicated to only 0 peer(s) instead of 1 peers
19/07/10 11:16:17 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:16:17 WARN BlockManager: Block input-0-1562757377400 replicated to only 0 peer(s) instead of 1 peers

I don't how to figure out can anyone help me really appreciated.

Thank you



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org