You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Noppanit Charassinvichai <no...@gmail.com> on 2017/07/19 00:54:51 UTC

[Spark Streaming] How to make this code work?

I'm super new to Spark and I'm writing this job to parse nginx log to ORC
file format so it can be read from Presto. We wrote LogLine2Json which
parse a line of nginx log to json. And that works fine.

    val sqs = streamContext.receiverStream(new SQSReceiver("elb")
      //.credentials("key", "secret")
      .at(Regions.US_EAST_1)
      .withTimeout(5))

    val jsonRows = sqs.mapPartitions(partitions => {
      val sqlSession = SparkSession
        .builder()
        .getOrCreate()

      val s3Client = new AmazonS3Client(new
BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"),
sys.env("AWS_SECRET_ACCESS_KEY")))

      val txfm = new LogLine2Json
      val log = Logger.getLogger("parseLog")

      partitions.map(messages => {
        val sqsMsg = Json.parse(messages)
        System.out.println(sqsMsg)

        val bucketName =
Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"",
"")
        val key =
Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"",
"")
        System.out.println(bucketName)
        System.out.println(key)
        val obj = s3Client.getObject(new GetObjectRequest(bucketName, key))
        val stream = obj.getObjectContent()
        scala.io.Source.fromInputStream(stream).getLines().map(line => {
            try{
              val l = txfm.parseLine(line)
              sqlSession.read.schema(schema.schema).json(l)

            }
            catch {
              case e: Throwable => {log.info(line); "";}
            }
          }).filter(line => line != "{}")
      })
    })


    jsonRows.foreachRDD(r => {
r.write.mode("append").format("orc").option("compression","zlib").save("/tmp/spark/presto")
    })

    streamContext.start()
    streamContext.awaitTermination()
  }

The code is going to read S3 key off of SQS and read the file and parse the
file and save to ORC. However, I can't get this to compile. It complains
that r doesn't have write method in this line

r.write.mode("append").format("orc").option("compression","zlib").save("/tmp/spark/presto")

Please help.
Thanks a lot.