You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Zhun Shen <sh...@gmail.com> on 2016/02/23 14:28:18 UTC

Use maxmind geoip lib to process ip on Spark/Spark Streaming

Hi all,

Currently, I sent nginx log to Kafka then I want to use Spark Streaming to parse the log and enrich the IP info with geoip libs from Maxmind. 

I found this one https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>, but spark streaming throw error and told that the lib was not Serializable.

Does anyone there way to process the IP info in Spark Streaming? Many thanks.


RE: Use maxmind geoip lib to process ip on Spark/Spark Streaming

Posted by Silvio Fiorito <si...@granturing.com>.
I’ve used the code below with SparkSQL. I was using this with Spark 1.4 but should still be good with 1.6. In this case I have a UDF to do the lookup, but for Streaming you’d just have a lambda to apply in a map function, so no UDF wrapper.

import org.apache.spark.sql.functions._
import java.io.File
import java.net.InetAddress
import com.maxmind.geoip2._

object GeoIPLookup {
    @transient lazy val reader = {
        val db = new File("/data/meetup/GeoLite2-City.mmdb")

        val reader = new DatabaseReader.Builder(db).build()

        reader
    }
}

case class Location(latitude: Double, longitude: Double)
case class Geo(city: String, country: String, loc: Location)

val iplookup = udf { (s: String) => {
   val ip = InetAddress.getByName(s)

   val record = GeoIPLookup.reader.city(ip)

   val city = record.getCity
   val country = record.getCountry
   val location = record.getLocation

   Geo(city.getName, country.getName, Location(location.getLatitude, location.getLongitude))
} }

val withGeo = df.withColumn("geo", iplookup(column("ip")))


From: Zhun Shen<ma...@gmail.com>
Sent: Monday, February 29, 2016 11:17 PM
To: romain sagean<ma...@hupi.fr>
Cc: user<ma...@spark.apache.org>
Subject: Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming

Hi,

I check the dependencies and fix the bug. It work well on Spark but not on Spark Streaming. So I think I still need find another way to do it.


On Feb 26, 2016, at 2:47 PM, Zhun Shen <sh...@gmail.com>> wrote:

Hi,

thanks for you advice. I tried your method, I use Gradle to manage my scala code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in Gradle.

spark version: 1.6.0
scala: 2.10.4
scala-maxmind-iplookups: 0.2.0

I run my test, got the error as below:
java.lang.NoClassDefFoundError: scala/collection/JavaConversions$JMapWrapperLike
at com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)




On Feb 24, 2016, at 1:10 AM, romain sagean <ro...@hupi.fr>> wrote:

I realize I forgot the sbt part

resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/"<http://maven.snplow.com/releases/>

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.3.0",
  "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  % "0.2.0"
)

otherwise, to process streaming log I use logstash with kafka as input. You can set kafka as output if you need to do some extra calculation with spark.

Le 23/02/2016 15:07, Romain Sagean a écrit :
Hi,
I use maxmind geoip with spark (no streaming). To make it work you should use mapPartition. I don't know if something similar exist for spark streaming.

my code for reference:

  def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
    val lookupResult = ipLookups.performLookups(ip)
    val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
    val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
    val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString
    val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString
    return List(countryName, city, latitude, longitude)
  }
sc.addFile("/home/your_user/GeoLiteCity.dat")

//load your data in my_data rdd

my_data.mapPartitions { rows =>
        val ipLookups = IpLookups(geoFile = Some(SparkFiles.get("GeoLiteCity.dat")))
        rows.map { row => row ::: parseIP(row(3),ipLookups) }
}

Le 23/02/2016 14:28, Zhun Shen a écrit :
Hi all,

Currently, I sent nginx log to Kafka then I want to use Spark Streaming to parse the log and enrich the IP info with geoip libs from Maxmind.

I found this one <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git> https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark streaming throw error and told that the lib was not Serializable.

Does anyone there way to process the IP info in Spark Streaming? Many thanks.






Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming

Posted by Zhun Shen <sh...@gmail.com>.
Hi,

I check the dependencies and fix the bug. It work well on Spark but not on Spark Streaming. So I think I still need find another way to do it.

 
> On Feb 26, 2016, at 2:47 PM, Zhun Shen <sh...@gmail.com> wrote:
> 
> Hi,
> 
> thanks for you advice. I tried your method, I use Gradle to manage my scala code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in Gradle.
> 
> spark version: 1.6.0
> scala: 2.10.4
> scala-maxmind-iplookups: 0.2.0
> 
> I run my test, got the error as below:
> java.lang.NoClassDefFoundError: scala/collection/JavaConversions$JMapWrapperLike
> 	at com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)
> 
> 
> 
> 
>> On Feb 24, 2016, at 1:10 AM, romain sagean <romain.sagean@hupi.fr <ma...@hupi.fr>> wrote:
>> 
>> I realize I forgot the sbt part
>> 
>> resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/" <http://maven.snplow.com/releases/>
>> 
>> libraryDependencies ++= Seq(
>>   "org.apache.spark" %% "spark-core" % "1.3.0",
>>   "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  % "0.2.0"
>> )
>> 
>> otherwise, to process streaming log I use logstash with kafka as input. You can set kafka as output if you need to do some extra calculation with spark.
>> 
>> Le 23/02/2016 15:07, Romain Sagean a écrit :
>>> Hi,
>>> I use maxmind geoip with spark (no streaming). To make it work you should use mapPartition. I don't know if something similar exist for spark streaming.
>>> 
>>> my code for reference:
>>> 
>>>   def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
>>>     val lookupResult = ipLookups.performLookups(ip)
>>>     val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
>>>     val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
>>>     val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString
>>>     val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString
>>>     return List(countryName, city, latitude, longitude)
>>>   }
>>> sc.addFile("/home/your_user/GeoLiteCity.dat")
>>> 
>>> //load your data in my_data rdd
>>> 
>>> my_data.mapPartitions { rows =>
>>>         val ipLookups = IpLookups(geoFile = Some(SparkFiles.get("GeoLiteCity.dat")))
>>>         rows.map { row => row ::: parseIP(row(3),ipLookups) }
>>> }
>>> 
>>> Le 23/02/2016 14:28, Zhun Shen a écrit :
>>>> Hi all,
>>>> 
>>>> Currently, I sent nginx log to Kafka then I want to use Spark Streaming to parse the log and enrich the IP info with geoip libs from Maxmind. 
>>>> 
>>>> I found this one  <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>, but spark streaming throw error and told that the lib was not Serializable.
>>>> 
>>>> Does anyone there way to process the IP info in Spark Streaming? Many thanks.
>>>> 
>>> 
>> 
> 


Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming

Posted by Romain Sagean <ro...@hupi.fr>.
it seems like some library are missing. I'm not good at compiling and I
don't know how to use gradle. But for sbt I use sbt-assembly plugin (
https://github.com/sbt/sbt-assembly) to include all dependency and make a
fat jar. For gradle I have found this:
https://github.com/musketyr/gradle-fatjar-plugin.

my complete build.sbt for reference.


import AssemblyKeys._name := "ON-3_geolocation"version :=
"1.0"scalaVersion := "2.10.4"resolvers += "SnowPlow Repo" at
"http://maven.snplow.com/releases/"libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.3.0",
  "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  %
"0.2.0")retrieveManaged := trueassemblySettingsmergeStrategy in
assembly := {  case m if m.toLowerCase.endsWith("manifest.mf")
 => MergeStrategy.discard  case m if
m.toLowerCase.matches("meta-inf.*\\.sf$")      =>
MergeStrategy.discard  case "log4j.properties"
         => MergeStrategy.discard  case m if
m.toLowerCase.startsWith("meta-inf/services/") =>
MergeStrategy.filterDistinctLines  case "reference.conf"
                     => MergeStrategy.concat  case _
                                => MergeStrategy.first}


2016-02-26 7:47 GMT+01:00 Zhun Shen <sh...@gmail.com>:

> Hi,
>
> thanks for you advice. I tried your method, I use Gradle to manage my
> scala code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was
> imported in Gradle.
>
> spark version: 1.6.0
> scala: 2.10.4
> scala-maxmind-iplookups: 0.2.0
>
> I run my test, got the error as below:
> java.lang.NoClassDefFoundError:
> scala/collection/JavaConversions$JMapWrapperLike
> at
> com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)
>
>
>
>
> On Feb 24, 2016, at 1:10 AM, romain sagean <ro...@hupi.fr> wrote:
>
> I realize I forgot the sbt part
>
> resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/"
> <http://maven.snplow.com/releases/>
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.3.0",
>   "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  % "0.2.0"
> )
>
> otherwise, to process streaming log I use logstash with kafka as input.
> You can set kafka as output if you need to do some extra calculation with
> spark.
>
> Le 23/02/2016 15:07, Romain Sagean a écrit :
>
> Hi,
> I use maxmind geoip with spark (no streaming). To make it work you should
> use mapPartition. I don't know if something similar exist for spark
> streaming.
>
> my code for reference:
>
>   def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
>     val lookupResult = ipLookups.performLookups(ip)
>     val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
>     val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
>     val latitude =
> (lookupResult._1).map(_.latitude).getOrElse(None).toString
>     val longitude =
> (lookupResult._1).map(_.longitude).getOrElse(None).toString
>     return List(countryName, city, latitude, longitude)
>   }
> sc.addFile("/home/your_user/GeoLiteCity.dat")
>
> //load your data in my_data rdd
>
> my_data.mapPartitions { rows =>
>         val ipLookups = IpLookups(geoFile =
> Some(SparkFiles.get("GeoLiteCity.dat")))
>         rows.map { row => row ::: parseIP(row(3),ipLookups) }
> }
>
> Le 23/02/2016 14:28, Zhun Shen a écrit :
>
> Hi all,
>
> Currently, I sent nginx log to Kafka then I want to use Spark Streaming to
> parse the log and enrich the IP info with geoip libs from Maxmind.
>
> I found this one <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>
> https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark
> streaming throw error and told that the lib was not Serializable.
>
> Does anyone there way to process the IP info in Spark Streaming? Many
> thanks.
>
>
>
>
>


-- 
*Romain Sagean*

*romain.sagean@hupi.fr <ro...@hupi.fr>*

Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming

Posted by Zhun Shen <sh...@gmail.com>.
Hi,

thanks for you advice. I tried your method, I use Gradle to manage my scala code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in Gradle.

spark version: 1.6.0
scala: 2.10.4
scala-maxmind-iplookups: 0.2.0

I run my test, got the error as below:
java.lang.NoClassDefFoundError: scala/collection/JavaConversions$JMapWrapperLike
	at com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)




> On Feb 24, 2016, at 1:10 AM, romain sagean <ro...@hupi.fr> wrote:
> 
> I realize I forgot the sbt part
> 
> resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/" <http://maven.snplow.com/releases/>
> 
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.3.0",
>   "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  % "0.2.0"
> )
> 
> otherwise, to process streaming log I use logstash with kafka as input. You can set kafka as output if you need to do some extra calculation with spark.
> 
> Le 23/02/2016 15:07, Romain Sagean a écrit :
>> Hi,
>> I use maxmind geoip with spark (no streaming). To make it work you should use mapPartition. I don't know if something similar exist for spark streaming.
>> 
>> my code for reference:
>> 
>>   def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
>>     val lookupResult = ipLookups.performLookups(ip)
>>     val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
>>     val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
>>     val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString
>>     val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString
>>     return List(countryName, city, latitude, longitude)
>>   }
>> sc.addFile("/home/your_user/GeoLiteCity.dat")
>> 
>> //load your data in my_data rdd
>> 
>> my_data.mapPartitions { rows =>
>>         val ipLookups = IpLookups(geoFile = Some(SparkFiles.get("GeoLiteCity.dat")))
>>         rows.map { row => row ::: parseIP(row(3),ipLookups) }
>> }
>> 
>> Le 23/02/2016 14:28, Zhun Shen a écrit :
>>> Hi all,
>>> 
>>> Currently, I sent nginx log to Kafka then I want to use Spark Streaming to parse the log and enrich the IP info with geoip libs from Maxmind. 
>>> 
>>> I found this one  <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>, but spark streaming throw error and told that the lib was not Serializable.
>>> 
>>> Does anyone there way to process the IP info in Spark Streaming? Many thanks.
>>> 
>> 
> 


Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming

Posted by romain sagean <ro...@hupi.fr>.
I realize I forgot the sbt part

resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/"

libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-core" % "1.3.0",
   "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  % "0.2.0"
)

otherwise, to process streaming log I use logstash with kafka as input. 
You can set kafka as output if you need to do some extra calculation 
with spark.

Le 23/02/2016 15:07, Romain Sagean a écrit :
> Hi,
> I use maxmind geoip with spark (no streaming). To make it work you 
> should use mapPartition. I don't know if something similar exist for 
> spark streaming.
>
> my code for reference:
>
>   def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
>     val lookupResult = ipLookups.performLookups(ip)
>     val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
>     val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
>     val latitude = 
> (lookupResult._1).map(_.latitude).getOrElse(None).toString
>     val longitude = 
> (lookupResult._1).map(_.longitude).getOrElse(None).toString
>     return List(countryName, city, latitude, longitude)
>   }
> sc.addFile("/home/your_user/GeoLiteCity.dat")
>
> //load your data in my_data rdd
>
> my_data.mapPartitions { rows =>
>         val ipLookups = IpLookups(geoFile = 
> Some(SparkFiles.get("GeoLiteCity.dat")))
>         rows.map { row => row ::: parseIP(row(3),ipLookups) }
> }
>
> Le 23/02/2016 14:28, Zhun Shen a écrit :
>> Hi all,
>>
>> Currently, I sent nginx log to Kafka then I want to use Spark 
>> Streaming to parse the log and enrich the IP info with geoip libs 
>> from Maxmind.
>>
>> I found this one 
>> https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark 
>> streaming throw error and told that the lib was not Serializable.
>>
>> Does anyone there way to process the IP info in Spark Streaming? Many 
>> thanks.
>>
>


Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming

Posted by Romain Sagean <ro...@hupi.fr>.
 Hi,
I use maxmind geoip with spark (no streaming). To make it work you should
use mapPartition. I don't know if something similar exist for spark
streaming.

my code for reference:

  def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
    val lookupResult = ipLookups.performLookups(ip)
    val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
    val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
    val latitude =
(lookupResult._1).map(_.latitude).getOrElse(None).toString
    val longitude =
(lookupResult._1).map(_.longitude).getOrElse(None).toString
    return List(countryName, city, latitude, longitude)
  }
sc.addFile("/home/your_user/GeoLiteCity.dat")

//load your data in my_data rdd

my_data.mapPartitions { rows =>
        val ipLookups = IpLookups(geoFile =
Some(SparkFiles.get("GeoLiteCity.dat")))
        rows.map { row => row ::: parseIP(row(3),ipLookups) }
}

Le 23/02/2016 14:28, Zhun Shen a écrit :

Hi all,

Currently, I sent nginx log to Kafka then I want to use Spark Streaming to
parse the log and enrich the IP info with geoip libs from Maxmind.

I found this one https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git,
but spark streaming throw error and told that the lib was not Serializable.

Does anyone there way to process the IP info in Spark Streaming? Many
thanks.