You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sunil Kumar Chinnamgari <su...@yahoo.com.INVALID> on 2016/07/28 17:28:49 UTC

Re: Unable to create a dataframe from json dstream using pyspark

Hi,



I am attempting to create a dataframe from json in dstream but the code below does not seem to help get the dataframe right -
import sysimport jsonfrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom pyspark.sql import SQLContextdef getSqlContextInstance(sparkContext):    if ('sqlContextSingletonInstance' not in globals()):        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)    return globals()['sqlContextSingletonInstance']
if __name__ == "__main__":    if len(sys.argv) != 3:        raise IOError("Invalid usage; the correct format is:\nquadrant_count.py <hostname> <port>")
# Initialize a SparkContext with a namespc = SparkContext(appName="jsonread")sqlContext = SQLContext(spc)# Create a StreamingContext with a batch interval of 2 secondsstc = StreamingContext(spc, 2)# Checkpointing featurestc.checkpoint("checkpoint")# Creating a DStream to connect to hostname:port (like localhost:9999)lines = stc.socketTextStream(sys.argv[1], int(sys.argv[2]))lines.pprint()parsed = lines.map(lambda x: json.loads(x))def process(time, rdd):    print("========= %s =========" % str(time))    try:        # Get the singleton instance of SQLContext        sqlContext = getSqlContextInstance(rdd.context)        # Convert RDD[String] to RDD[Row] to DataFrame        rowRdd = rdd.map(lambda w: Row(word=w))        wordsDataFrame = sqlContext.createDataFrame(rowRdd)        # Register as table        wordsDataFrame.registerTempTable("mytable")        testDataFrame = sqlContext.sql("select summary from mytable")        print(testDataFrame.show())        print(testDataFrame.printSchema())    except:        passparsed.foreachRDD(process)stc.start()# Wait for the computation to terminatestc.awaitTermination()
No errors but when the script runs, it does read the json from streaming context successfully however it does not print the values in summary or the dataframe schema.
Example json I am attempting to read -
{"reviewerID": "A2IBPI20UZIR0U", "asin": "1384719342", "reviewerName": "cassandra tu \"Yeah, well, that's just like, u...", "helpful": [0, 0], "reviewText": "Not much to write about here, but it does exactly what it's supposed to. filters out the pop sounds. now my recordings are much more crisp. it is one of the lowest prices pop filters on amazon so might as well buy it, they honestly work the same despite their pricing,", "overall": 5.0, "summary": "good", "unixReviewTime": 1393545600, "reviewTime": "02 28, 2014"}
I am absolute new comer to spark streaming and started working on pet projects by reading documentation. Any help and guidance is greatly appreciated.
Best Regards,Sunil Kumar Chinnamgari