You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Guillermo Ortiz <ko...@gmail.com> on 2014/12/26 10:56:24 UTC

Spark Streaming and Windows, it always counts the logs during all the windows. Why?

I'm trying to make some operation with windows and intervals.

I get data every15 seconds, and want to have a windows of 60 seconds
with  batch intervals of 15 seconds.
I''m injecting data with ncat. if I inject 3 logs in the same interval
I get into the "do something" each 15 secods during one minute,
I understand that I get into "do something" the first interval but the
logs shouldn't appear in the next interval, Why do I get into "do
something" in all the intervals for a minute? What am I doing wrong?


    val sparkConf = new SparkConf().setMaster(sparkMode).setAppName("MiApp");
    val ssc = new StreamingContext(sparkConf, Seconds(15));
    val lines = ssc.socketTextStream("localhost", sparkPort.toInt);
    ssc.checkpoint(sparkCheckPoint)

    ruleSqlInjection(lines)

    ssc.start()
    ssc.awaitTermination()



  def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
    val filterSql = lines.filter(line => line.contains("SQL"))
    val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
classOf[Model]))
    val groupSql = jsonSql.map {
      json =>
        val srcIp = json.getMessage().getCliIP()
        val srcURL = json.getMessage().getReqHost()
        (srcIp + "_" + srcURL, json)
    }
    val errorLinesValueReduce =
groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
    errorLinesValueReduce.foreachRDD {
      rdd =>
        val elem1 = rdd.take(1)
        println("take1 ->" + elem1(0)._1)
        println("take2 ->" + elem1(0)._2)  //it's always getting the
logs for the first 15 seconds during one minute..

        if (elem1.size > 0) {
          val alerts = elem1(0)._2
          if (alerts.size > 2) {
            println("do something")  // I don't undestand why it's
getting into here 4 intervals
          }
        }
    }
  }

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Streaming and Windows, it always counts the logs during all the windows. Why?

Posted by Guillermo Ortiz <ko...@gmail.com>.
Oh, I didn't understand what I was doing, my fault (too much parties
these xmas). Thought windows works in another weird way. Sorry for the
questions..

2014-12-26 13:42 GMT+01:00 Guillermo Ortiz <ko...@gmail.com>:
> I'm trying to understand why it's not working and I typed some println
> to check what the code was executing..
>
>   def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
>     println("1"); //********************Just one time, when I start the program
>     val filterSql = lines.filter(line => line.contains("SQL"))
>     val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
> classOf[Akamai]))
>     val groupSql = jsonSql.map {
>       json =>
>         val srcIp = json.getMessage().getCliIP()
>         val srcURL = json.getMessage().getReqHost()
>         (srcIp + "_" + srcURL, json)
>     }
>     println("2"); //********************Just one time, when I start the program
>
>     val errorLinesValueReduce =
> groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
>
>     println("3"); //********************Just one time, when I start the program
>     errorLinesValueReduce.foreachRDD {
>       rdd =>
>         rdd.foreach { elem1 =>
>
>           println("4 " + elem1); //********************All time
>           if (elem1._2.size > 0) {
>             println("do something")
>           }
>         }
>     }
>     println("fin foreachRdd");  ///********************Just one time,
> when I start the program
>
>
> Why it's just executing the println("4...")?? shouldn't it execute all
> the code each 15 seconds that it's what it's defined on the context
> (val ssc = new StreamingContext(sparkConf, Seconds(15));)
>
> 2014-12-26 10:56 GMT+01:00 Guillermo Ortiz <ko...@gmail.com>:
>> I'm trying to make some operation with windows and intervals.
>>
>> I get data every15 seconds, and want to have a windows of 60 seconds
>> with  batch intervals of 15 seconds.
>> I''m injecting data with ncat. if I inject 3 logs in the same interval
>> I get into the "do something" each 15 secods during one minute,
>> I understand that I get into "do something" the first interval but the
>> logs shouldn't appear in the next interval, Why do I get into "do
>> something" in all the intervals for a minute? What am I doing wrong?
>>
>>
>>     val sparkConf = new SparkConf().setMaster(sparkMode).setAppName("MiApp");
>>     val ssc = new StreamingContext(sparkConf, Seconds(15));
>>     val lines = ssc.socketTextStream("localhost", sparkPort.toInt);
>>     ssc.checkpoint(sparkCheckPoint)
>>
>>     ruleSqlInjection(lines)
>>
>>     ssc.start()
>>     ssc.awaitTermination()
>>
>>
>>
>>   def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
>>     val filterSql = lines.filter(line => line.contains("SQL"))
>>     val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
>> classOf[Model]))
>>     val groupSql = jsonSql.map {
>>       json =>
>>         val srcIp = json.getMessage().getCliIP()
>>         val srcURL = json.getMessage().getReqHost()
>>         (srcIp + "_" + srcURL, json)
>>     }
>>     val errorLinesValueReduce =
>> groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
>>     errorLinesValueReduce.foreachRDD {
>>       rdd =>
>>         val elem1 = rdd.take(1)
>>         println("take1 ->" + elem1(0)._1)
>>         println("take2 ->" + elem1(0)._2)  //it's always getting the
>> logs for the first 15 seconds during one minute..
>>
>>         if (elem1.size > 0) {
>>           val alerts = elem1(0)._2
>>           if (alerts.size > 2) {
>>             println("do something")  // I don't undestand why it's
>> getting into here 4 intervals
>>           }
>>         }
>>     }
>>   }

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Streaming and Windows, it always counts the logs during all the windows. Why?

Posted by Guillermo Ortiz <ko...@gmail.com>.
I'm trying to understand why it's not working and I typed some println
to check what the code was executing..

  def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
    println("1"); //********************Just one time, when I start the program
    val filterSql = lines.filter(line => line.contains("SQL"))
    val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
classOf[Akamai]))
    val groupSql = jsonSql.map {
      json =>
        val srcIp = json.getMessage().getCliIP()
        val srcURL = json.getMessage().getReqHost()
        (srcIp + "_" + srcURL, json)
    }
    println("2"); //********************Just one time, when I start the program

    val errorLinesValueReduce =
groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))

    println("3"); //********************Just one time, when I start the program
    errorLinesValueReduce.foreachRDD {
      rdd =>
        rdd.foreach { elem1 =>

          println("4 " + elem1); //********************All time
          if (elem1._2.size > 0) {
            println("do something")
          }
        }
    }
    println("fin foreachRdd");  ///********************Just one time,
when I start the program


Why it's just executing the println("4...")?? shouldn't it execute all
the code each 15 seconds that it's what it's defined on the context
(val ssc = new StreamingContext(sparkConf, Seconds(15));)

2014-12-26 10:56 GMT+01:00 Guillermo Ortiz <ko...@gmail.com>:
> I'm trying to make some operation with windows and intervals.
>
> I get data every15 seconds, and want to have a windows of 60 seconds
> with  batch intervals of 15 seconds.
> I''m injecting data with ncat. if I inject 3 logs in the same interval
> I get into the "do something" each 15 secods during one minute,
> I understand that I get into "do something" the first interval but the
> logs shouldn't appear in the next interval, Why do I get into "do
> something" in all the intervals for a minute? What am I doing wrong?
>
>
>     val sparkConf = new SparkConf().setMaster(sparkMode).setAppName("MiApp");
>     val ssc = new StreamingContext(sparkConf, Seconds(15));
>     val lines = ssc.socketTextStream("localhost", sparkPort.toInt);
>     ssc.checkpoint(sparkCheckPoint)
>
>     ruleSqlInjection(lines)
>
>     ssc.start()
>     ssc.awaitTermination()
>
>
>
>   def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
>     val filterSql = lines.filter(line => line.contains("SQL"))
>     val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
> classOf[Model]))
>     val groupSql = jsonSql.map {
>       json =>
>         val srcIp = json.getMessage().getCliIP()
>         val srcURL = json.getMessage().getReqHost()
>         (srcIp + "_" + srcURL, json)
>     }
>     val errorLinesValueReduce =
> groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
>     errorLinesValueReduce.foreachRDD {
>       rdd =>
>         val elem1 = rdd.take(1)
>         println("take1 ->" + elem1(0)._1)
>         println("take2 ->" + elem1(0)._2)  //it's always getting the
> logs for the first 15 seconds during one minute..
>
>         if (elem1.size > 0) {
>           val alerts = elem1(0)._2
>           if (alerts.size > 2) {
>             println("do something")  // I don't undestand why it's
> getting into here 4 intervals
>           }
>         }
>     }
>   }

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org