You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nick <ni...@gmail.com> on 2016/10/11 23:43:41 UTC
textFileStream dStream to DataFrame issues
I have question about how to use textFileStream, I have included a code
snippet below. I am trying to read .gz files that are getting put into my
bucket. I do not want to specify the schema, I have a similar feature that
just does spark.read.json(inputBucket). This works great and if I can get
that DStream input into a DataFrame I am golden. Can anyone point me to
what I am doing wrong? The files I am reading get continually add to that
folder as .gz within them are json objects. If I uncompress the .gz the
dStream.print() shows the content but it is not turning it into the
DataFrame as expect.
def processStream(dStream: DStream[String], outputBucket: String, spark:
SparkSession) {
var df:DataFrame = null
dStream.foreachRDD { rdd =>
import spark.implicits._
if (jsonDf != null) {
//If I do
//println("RDD: " + rdd.toDF().printSchema())
//I get nothing when reading from an .gz, I just get an empty dataFrame
//If uncompress that .gz I get some content everything in one column
jsonDf = jsonDf.union(rdd.toDF())
} else {
jsonDf = rdd.toDF()
}
}
if (jsonDf != null)
//Some processing
}
val spark = SparkSession.builder().appName("Some
Name").config("spark.sql.hive.thriftServer.singleSession",
"true").getOrCreate()
val context = spark.sparkContext
val ssc = new StreamingContext(context, Seconds(10))
val dStream =
ssc.textFileStream("/Users/userName/Documents/SparkStreamingTestfolder/" +
"*/*/*/*")
processStream(dStream, "/Users/userName/destination/folder /streamTest",
spark)
ssc.start()
ssc.awaitTermination()