You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tim Smith <se...@gmail.com> on 2017/04/06 04:47:15 UTC

Consuming AWS Cloudwatch logs from Kinesis into Spark

I am sharing this code snippet since I spent quite some time figuring it
out and I couldn't find any examples online. Between the Kinesis
documentation, tutorial on AWS site and other code snippets on the
Internet, I was confused about structure/format of the messages that Spark
fetches from Kinesis - base64 encoded, json, gzipped - which one first and
what order.

I tested this on EMR-5.4.0, Amazon Hadoop 2.7.3 and Spark 2.1.0. Hope it
helps others googling for similar info. I tried using Structured Streaming
but (1) it's in Alpha and (2) despite including what I thought were all the
dependencies, it complained of not finding DataSource.Kinesis. You probably
do not need all the libs but I am just too lazy to redact ones you don't
require for the snippet below :)

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import java.util.Base64
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.explode
import org.apache.commons.math3.stat.descriptive._
import java.io.File
import java.net.InetAddress
import scala.util.control.NonFatal
import org.apache.spark.SparkFiles
import org.apache.spark.sql.SaveMode
import java.util.Properties;
import org.json4s._
import org.json4s.jackson.JsonMethods._
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPOutputStream, GZIPInputStream}
import scala.util.Try


//sc.setLogLevel("INFO")

val ssc = new StreamingContext(sc, Seconds(30))

val kinesisStreams = (0 until 2).map { i => KinesisUtils.createStream(ssc,
"myApp", "cloudwatchlogs",
"https://kinesis.us-east-1.amazonaws.com","us-east-1",
InitialPositionInStream.LATEST , Seconds(30),
StorageLevel.MEMORY_AND_DISK_2,"myId","mySecret") }

val unionStreams = ssc.union(kinesisStreams)

unionStreams.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
      if(rdd.count() > 0) {
      val json = rdd.map(input => {
      val inputStream = new GZIPInputStream(new ByteArrayInputStream(input))
      val record = scala.io.Source.fromInputStream(inputStream).mkString
      compact(render(parse(record)))
      })

      val df = spark.sqlContext.read.json(json)
      val preDF =
df.select($"logGroup",explode($"logEvents").as("events_flat"))
      val penDF = preDF.select($"logGroup",$"events_flat.extractedFields")
      val finalDF =
penDF.select($"logGroup".as("cluster"),$"extractedFields.*")
      finalDF.printSchema()
      finalDF.show()
     }
})

ssc.start



--
Thanks,

Tim