You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Guillermo Ortiz <ko...@gmail.com> on 2014/12/26 10:56:24 UTC
Spark Streaming and Windows, it always counts the logs during all the
windows. Why?
I'm trying to make some operation with windows and intervals.
I get data every15 seconds, and want to have a windows of 60 seconds
with batch intervals of 15 seconds.
I''m injecting data with ncat. if I inject 3 logs in the same interval
I get into the "do something" each 15 secods during one minute,
I understand that I get into "do something" the first interval but the
logs shouldn't appear in the next interval, Why do I get into "do
something" in all the intervals for a minute? What am I doing wrong?
val sparkConf = new SparkConf().setMaster(sparkMode).setAppName("MiApp");
val ssc = new StreamingContext(sparkConf, Seconds(15));
val lines = ssc.socketTextStream("localhost", sparkPort.toInt);
ssc.checkpoint(sparkCheckPoint)
ruleSqlInjection(lines)
ssc.start()
ssc.awaitTermination()
def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
val filterSql = lines.filter(line => line.contains("SQL"))
val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
classOf[Model]))
val groupSql = jsonSql.map {
json =>
val srcIp = json.getMessage().getCliIP()
val srcURL = json.getMessage().getReqHost()
(srcIp + "_" + srcURL, json)
}
val errorLinesValueReduce =
groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
errorLinesValueReduce.foreachRDD {
rdd =>
val elem1 = rdd.take(1)
println("take1 ->" + elem1(0)._1)
println("take2 ->" + elem1(0)._2) //it's always getting the
logs for the first 15 seconds during one minute..
if (elem1.size > 0) {
val alerts = elem1(0)._2
if (alerts.size > 2) {
println("do something") // I don't undestand why it's
getting into here 4 intervals
}
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Spark Streaming and Windows, it always counts the logs during all
the windows. Why?
Posted by Guillermo Ortiz <ko...@gmail.com>.
Oh, I didn't understand what I was doing, my fault (too much parties
these xmas). Thought windows works in another weird way. Sorry for the
questions..
2014-12-26 13:42 GMT+01:00 Guillermo Ortiz <ko...@gmail.com>:
> I'm trying to understand why it's not working and I typed some println
> to check what the code was executing..
>
> def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
> println("1"); //********************Just one time, when I start the program
> val filterSql = lines.filter(line => line.contains("SQL"))
> val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
> classOf[Akamai]))
> val groupSql = jsonSql.map {
> json =>
> val srcIp = json.getMessage().getCliIP()
> val srcURL = json.getMessage().getReqHost()
> (srcIp + "_" + srcURL, json)
> }
> println("2"); //********************Just one time, when I start the program
>
> val errorLinesValueReduce =
> groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
>
> println("3"); //********************Just one time, when I start the program
> errorLinesValueReduce.foreachRDD {
> rdd =>
> rdd.foreach { elem1 =>
>
> println("4 " + elem1); //********************All time
> if (elem1._2.size > 0) {
> println("do something")
> }
> }
> }
> println("fin foreachRdd"); ///********************Just one time,
> when I start the program
>
>
> Why it's just executing the println("4...")?? shouldn't it execute all
> the code each 15 seconds that it's what it's defined on the context
> (val ssc = new StreamingContext(sparkConf, Seconds(15));)
>
> 2014-12-26 10:56 GMT+01:00 Guillermo Ortiz <ko...@gmail.com>:
>> I'm trying to make some operation with windows and intervals.
>>
>> I get data every15 seconds, and want to have a windows of 60 seconds
>> with batch intervals of 15 seconds.
>> I''m injecting data with ncat. if I inject 3 logs in the same interval
>> I get into the "do something" each 15 secods during one minute,
>> I understand that I get into "do something" the first interval but the
>> logs shouldn't appear in the next interval, Why do I get into "do
>> something" in all the intervals for a minute? What am I doing wrong?
>>
>>
>> val sparkConf = new SparkConf().setMaster(sparkMode).setAppName("MiApp");
>> val ssc = new StreamingContext(sparkConf, Seconds(15));
>> val lines = ssc.socketTextStream("localhost", sparkPort.toInt);
>> ssc.checkpoint(sparkCheckPoint)
>>
>> ruleSqlInjection(lines)
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>>
>>
>> def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
>> val filterSql = lines.filter(line => line.contains("SQL"))
>> val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
>> classOf[Model]))
>> val groupSql = jsonSql.map {
>> json =>
>> val srcIp = json.getMessage().getCliIP()
>> val srcURL = json.getMessage().getReqHost()
>> (srcIp + "_" + srcURL, json)
>> }
>> val errorLinesValueReduce =
>> groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
>> errorLinesValueReduce.foreachRDD {
>> rdd =>
>> val elem1 = rdd.take(1)
>> println("take1 ->" + elem1(0)._1)
>> println("take2 ->" + elem1(0)._2) //it's always getting the
>> logs for the first 15 seconds during one minute..
>>
>> if (elem1.size > 0) {
>> val alerts = elem1(0)._2
>> if (alerts.size > 2) {
>> println("do something") // I don't undestand why it's
>> getting into here 4 intervals
>> }
>> }
>> }
>> }
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Spark Streaming and Windows, it always counts the logs during all
the windows. Why?
Posted by Guillermo Ortiz <ko...@gmail.com>.
I'm trying to understand why it's not working and I typed some println
to check what the code was executing..
def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
println("1"); //********************Just one time, when I start the program
val filterSql = lines.filter(line => line.contains("SQL"))
val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
classOf[Akamai]))
val groupSql = jsonSql.map {
json =>
val srcIp = json.getMessage().getCliIP()
val srcURL = json.getMessage().getReqHost()
(srcIp + "_" + srcURL, json)
}
println("2"); //********************Just one time, when I start the program
val errorLinesValueReduce =
groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
println("3"); //********************Just one time, when I start the program
errorLinesValueReduce.foreachRDD {
rdd =>
rdd.foreach { elem1 =>
println("4 " + elem1); //********************All time
if (elem1._2.size > 0) {
println("do something")
}
}
}
println("fin foreachRdd"); ///********************Just one time,
when I start the program
Why it's just executing the println("4...")?? shouldn't it execute all
the code each 15 seconds that it's what it's defined on the context
(val ssc = new StreamingContext(sparkConf, Seconds(15));)
2014-12-26 10:56 GMT+01:00 Guillermo Ortiz <ko...@gmail.com>:
> I'm trying to make some operation with windows and intervals.
>
> I get data every15 seconds, and want to have a windows of 60 seconds
> with batch intervals of 15 seconds.
> I''m injecting data with ncat. if I inject 3 logs in the same interval
> I get into the "do something" each 15 secods during one minute,
> I understand that I get into "do something" the first interval but the
> logs shouldn't appear in the next interval, Why do I get into "do
> something" in all the intervals for a minute? What am I doing wrong?
>
>
> val sparkConf = new SparkConf().setMaster(sparkMode).setAppName("MiApp");
> val ssc = new StreamingContext(sparkConf, Seconds(15));
> val lines = ssc.socketTextStream("localhost", sparkPort.toInt);
> ssc.checkpoint(sparkCheckPoint)
>
> ruleSqlInjection(lines)
>
> ssc.start()
> ssc.awaitTermination()
>
>
>
> def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
> val filterSql = lines.filter(line => line.contains("SQL"))
> val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
> classOf[Model]))
> val groupSql = jsonSql.map {
> json =>
> val srcIp = json.getMessage().getCliIP()
> val srcURL = json.getMessage().getReqHost()
> (srcIp + "_" + srcURL, json)
> }
> val errorLinesValueReduce =
> groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
> errorLinesValueReduce.foreachRDD {
> rdd =>
> val elem1 = rdd.take(1)
> println("take1 ->" + elem1(0)._1)
> println("take2 ->" + elem1(0)._2) //it's always getting the
> logs for the first 15 seconds during one minute..
>
> if (elem1.size > 0) {
> val alerts = elem1(0)._2
> if (alerts.size > 2) {
> println("do something") // I don't undestand why it's
> getting into here 4 intervals
> }
> }
> }
> }
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org