You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nick <ni...@gmail.com> on 2016/10/11 23:43:41 UTC

textFileStream dStream to DataFrame issues

I have question about how to use textFileStream, I have included a code
snippet below.  I am trying to read .gz files that are getting put into my
bucket.  I do not want to specify the schema, I have a similar feature that
just does spark.read.json(inputBucket).  This works great and if I can get
that DStream input into a DataFrame I am golden.  Can anyone point me to
what I am doing wrong?  The files I am reading get continually add to that
folder as .gz within them are json objects.  If I uncompress the .gz the
dStream.print() shows the content but it is not turning it into the
DataFrame as expect.





def processStream(dStream: DStream[String], outputBucket: String, spark:
SparkSession) {

var df:DataFrame = null

dStream.foreachRDD { rdd =>

import spark.implicits._

if (jsonDf != null) {

//If I do

//println("RDD: " + rdd.toDF().printSchema())

//I get nothing when reading from an .gz, I just get an empty dataFrame

//If uncompress that .gz I get some content everything in one column

jsonDf = jsonDf.union(rdd.toDF())

} else {

jsonDf = rdd.toDF()

}

}

if (jsonDf != null)

//Some processing

}



val spark = SparkSession.builder().appName("Some
Name").config("spark.sql.hive.thriftServer.singleSession",
"true").getOrCreate()

val context = spark.sparkContext

val ssc = new StreamingContext(context, Seconds(10))

val dStream =
ssc.textFileStream("/Users/userName/Documents/SparkStreamingTestfolder/" +
"*/*/*/*")



processStream(dStream, "/Users/userName/destination/folder /streamTest",
spark)



ssc.start()

ssc.awaitTermination()