You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Romain Sagean <ro...@hupi.fr> on 2016/02/23 15:07:47 UTC
Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming
Hi,
I use maxmind geoip with spark (no streaming). To make it work you should
use mapPartition. I don't know if something similar exist for spark
streaming.
my code for reference:
def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
val lookupResult = ipLookups.performLookups(ip)
val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
val latitude =
(lookupResult._1).map(_.latitude).getOrElse(None).toString
val longitude =
(lookupResult._1).map(_.longitude).getOrElse(None).toString
return List(countryName, city, latitude, longitude)
}
sc.addFile("/home/your_user/GeoLiteCity.dat")
//load your data in my_data rdd
my_data.mapPartitions { rows =>
val ipLookups = IpLookups(geoFile =
Some(SparkFiles.get("GeoLiteCity.dat")))
rows.map { row => row ::: parseIP(row(3),ipLookups) }
}
Le 23/02/2016 14:28, Zhun Shen a écrit :
Hi all,
Currently, I sent nginx log to Kafka then I want to use Spark Streaming to
parse the log and enrich the IP info with geoip libs from Maxmind.
I found this one https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git,
but spark streaming throw error and told that the lib was not Serializable.
Does anyone there way to process the IP info in Spark Streaming? Many
thanks.
RE: Use maxmind geoip lib to process ip on Spark/Spark Streaming
Posted by Silvio Fiorito <si...@granturing.com>.
I’ve used the code below with SparkSQL. I was using this with Spark 1.4 but should still be good with 1.6. In this case I have a UDF to do the lookup, but for Streaming you’d just have a lambda to apply in a map function, so no UDF wrapper.
import org.apache.spark.sql.functions._
import java.io.File
import java.net.InetAddress
import com.maxmind.geoip2._
object GeoIPLookup {
@transient lazy val reader = {
val db = new File("/data/meetup/GeoLite2-City.mmdb")
val reader = new DatabaseReader.Builder(db).build()
reader
}
}
case class Location(latitude: Double, longitude: Double)
case class Geo(city: String, country: String, loc: Location)
val iplookup = udf { (s: String) => {
val ip = InetAddress.getByName(s)
val record = GeoIPLookup.reader.city(ip)
val city = record.getCity
val country = record.getCountry
val location = record.getLocation
Geo(city.getName, country.getName, Location(location.getLatitude, location.getLongitude))
} }
val withGeo = df.withColumn("geo", iplookup(column("ip")))
From: Zhun Shen<ma...@gmail.com>
Sent: Monday, February 29, 2016 11:17 PM
To: romain sagean<ma...@hupi.fr>
Cc: user<ma...@spark.apache.org>
Subject: Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming
Hi,
I check the dependencies and fix the bug. It work well on Spark but not on Spark Streaming. So I think I still need find another way to do it.
On Feb 26, 2016, at 2:47 PM, Zhun Shen <sh...@gmail.com>> wrote:
Hi,
thanks for you advice. I tried your method, I use Gradle to manage my scala code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in Gradle.
spark version: 1.6.0
scala: 2.10.4
scala-maxmind-iplookups: 0.2.0
I run my test, got the error as below:
java.lang.NoClassDefFoundError: scala/collection/JavaConversions$JMapWrapperLike
at com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)
On Feb 24, 2016, at 1:10 AM, romain sagean <ro...@hupi.fr>> wrote:
I realize I forgot the sbt part
resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/"<http://maven.snplow.com/releases/>
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.3.0",
"com.snowplowanalytics" %% "scala-maxmind-iplookups" % "0.2.0"
)
otherwise, to process streaming log I use logstash with kafka as input. You can set kafka as output if you need to do some extra calculation with spark.
Le 23/02/2016 15:07, Romain Sagean a écrit :
Hi,
I use maxmind geoip with spark (no streaming). To make it work you should use mapPartition. I don't know if something similar exist for spark streaming.
my code for reference:
def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
val lookupResult = ipLookups.performLookups(ip)
val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString
val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString
return List(countryName, city, latitude, longitude)
}
sc.addFile("/home/your_user/GeoLiteCity.dat")
//load your data in my_data rdd
my_data.mapPartitions { rows =>
val ipLookups = IpLookups(geoFile = Some(SparkFiles.get("GeoLiteCity.dat")))
rows.map { row => row ::: parseIP(row(3),ipLookups) }
}
Le 23/02/2016 14:28, Zhun Shen a écrit :
Hi all,
Currently, I sent nginx log to Kafka then I want to use Spark Streaming to parse the log and enrich the IP info with geoip libs from Maxmind.
I found this one <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git> https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark streaming throw error and told that the lib was not Serializable.
Does anyone there way to process the IP info in Spark Streaming? Many thanks.
Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming
Posted by Zhun Shen <sh...@gmail.com>.
Hi,
I check the dependencies and fix the bug. It work well on Spark but not on Spark Streaming. So I think I still need find another way to do it.
> On Feb 26, 2016, at 2:47 PM, Zhun Shen <sh...@gmail.com> wrote:
>
> Hi,
>
> thanks for you advice. I tried your method, I use Gradle to manage my scala code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in Gradle.
>
> spark version: 1.6.0
> scala: 2.10.4
> scala-maxmind-iplookups: 0.2.0
>
> I run my test, got the error as below:
> java.lang.NoClassDefFoundError: scala/collection/JavaConversions$JMapWrapperLike
> at com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)
>
>
>
>
>> On Feb 24, 2016, at 1:10 AM, romain sagean <romain.sagean@hupi.fr <ma...@hupi.fr>> wrote:
>>
>> I realize I forgot the sbt part
>>
>> resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/" <http://maven.snplow.com/releases/>
>>
>> libraryDependencies ++= Seq(
>> "org.apache.spark" %% "spark-core" % "1.3.0",
>> "com.snowplowanalytics" %% "scala-maxmind-iplookups" % "0.2.0"
>> )
>>
>> otherwise, to process streaming log I use logstash with kafka as input. You can set kafka as output if you need to do some extra calculation with spark.
>>
>> Le 23/02/2016 15:07, Romain Sagean a écrit :
>>> Hi,
>>> I use maxmind geoip with spark (no streaming). To make it work you should use mapPartition. I don't know if something similar exist for spark streaming.
>>>
>>> my code for reference:
>>>
>>> def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
>>> val lookupResult = ipLookups.performLookups(ip)
>>> val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
>>> val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
>>> val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString
>>> val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString
>>> return List(countryName, city, latitude, longitude)
>>> }
>>> sc.addFile("/home/your_user/GeoLiteCity.dat")
>>>
>>> //load your data in my_data rdd
>>>
>>> my_data.mapPartitions { rows =>
>>> val ipLookups = IpLookups(geoFile = Some(SparkFiles.get("GeoLiteCity.dat")))
>>> rows.map { row => row ::: parseIP(row(3),ipLookups) }
>>> }
>>>
>>> Le 23/02/2016 14:28, Zhun Shen a écrit :
>>>> Hi all,
>>>>
>>>> Currently, I sent nginx log to Kafka then I want to use Spark Streaming to parse the log and enrich the IP info with geoip libs from Maxmind.
>>>>
>>>> I found this one <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>, but spark streaming throw error and told that the lib was not Serializable.
>>>>
>>>> Does anyone there way to process the IP info in Spark Streaming? Many thanks.
>>>>
>>>
>>
>
Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming
Posted by Romain Sagean <ro...@hupi.fr>.
it seems like some library are missing. I'm not good at compiling and I
don't know how to use gradle. But for sbt I use sbt-assembly plugin (
https://github.com/sbt/sbt-assembly) to include all dependency and make a
fat jar. For gradle I have found this:
https://github.com/musketyr/gradle-fatjar-plugin.
my complete build.sbt for reference.
import AssemblyKeys._name := "ON-3_geolocation"version :=
"1.0"scalaVersion := "2.10.4"resolvers += "SnowPlow Repo" at
"http://maven.snplow.com/releases/"libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.3.0",
"com.snowplowanalytics" %% "scala-maxmind-iplookups" %
"0.2.0")retrieveManaged := trueassemblySettingsmergeStrategy in
assembly := { case m if m.toLowerCase.endsWith("manifest.mf")
=> MergeStrategy.discard case m if
m.toLowerCase.matches("meta-inf.*\\.sf$") =>
MergeStrategy.discard case "log4j.properties"
=> MergeStrategy.discard case m if
m.toLowerCase.startsWith("meta-inf/services/") =>
MergeStrategy.filterDistinctLines case "reference.conf"
=> MergeStrategy.concat case _
=> MergeStrategy.first}
2016-02-26 7:47 GMT+01:00 Zhun Shen <sh...@gmail.com>:
> Hi,
>
> thanks for you advice. I tried your method, I use Gradle to manage my
> scala code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was
> imported in Gradle.
>
> spark version: 1.6.0
> scala: 2.10.4
> scala-maxmind-iplookups: 0.2.0
>
> I run my test, got the error as below:
> java.lang.NoClassDefFoundError:
> scala/collection/JavaConversions$JMapWrapperLike
> at
> com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)
>
>
>
>
> On Feb 24, 2016, at 1:10 AM, romain sagean <ro...@hupi.fr> wrote:
>
> I realize I forgot the sbt part
>
> resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/"
> <http://maven.snplow.com/releases/>
>
> libraryDependencies ++= Seq(
> "org.apache.spark" %% "spark-core" % "1.3.0",
> "com.snowplowanalytics" %% "scala-maxmind-iplookups" % "0.2.0"
> )
>
> otherwise, to process streaming log I use logstash with kafka as input.
> You can set kafka as output if you need to do some extra calculation with
> spark.
>
> Le 23/02/2016 15:07, Romain Sagean a écrit :
>
> Hi,
> I use maxmind geoip with spark (no streaming). To make it work you should
> use mapPartition. I don't know if something similar exist for spark
> streaming.
>
> my code for reference:
>
> def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
> val lookupResult = ipLookups.performLookups(ip)
> val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
> val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
> val latitude =
> (lookupResult._1).map(_.latitude).getOrElse(None).toString
> val longitude =
> (lookupResult._1).map(_.longitude).getOrElse(None).toString
> return List(countryName, city, latitude, longitude)
> }
> sc.addFile("/home/your_user/GeoLiteCity.dat")
>
> //load your data in my_data rdd
>
> my_data.mapPartitions { rows =>
> val ipLookups = IpLookups(geoFile =
> Some(SparkFiles.get("GeoLiteCity.dat")))
> rows.map { row => row ::: parseIP(row(3),ipLookups) }
> }
>
> Le 23/02/2016 14:28, Zhun Shen a écrit :
>
> Hi all,
>
> Currently, I sent nginx log to Kafka then I want to use Spark Streaming to
> parse the log and enrich the IP info with geoip libs from Maxmind.
>
> I found this one <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>
> https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark
> streaming throw error and told that the lib was not Serializable.
>
> Does anyone there way to process the IP info in Spark Streaming? Many
> thanks.
>
>
>
>
>
--
*Romain Sagean*
*romain.sagean@hupi.fr <ro...@hupi.fr>*
Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming
Posted by Zhun Shen <sh...@gmail.com>.
Hi,
thanks for you advice. I tried your method, I use Gradle to manage my scala code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in Gradle.
spark version: 1.6.0
scala: 2.10.4
scala-maxmind-iplookups: 0.2.0
I run my test, got the error as below:
java.lang.NoClassDefFoundError: scala/collection/JavaConversions$JMapWrapperLike
at com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)
> On Feb 24, 2016, at 1:10 AM, romain sagean <ro...@hupi.fr> wrote:
>
> I realize I forgot the sbt part
>
> resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/" <http://maven.snplow.com/releases/>
>
> libraryDependencies ++= Seq(
> "org.apache.spark" %% "spark-core" % "1.3.0",
> "com.snowplowanalytics" %% "scala-maxmind-iplookups" % "0.2.0"
> )
>
> otherwise, to process streaming log I use logstash with kafka as input. You can set kafka as output if you need to do some extra calculation with spark.
>
> Le 23/02/2016 15:07, Romain Sagean a écrit :
>> Hi,
>> I use maxmind geoip with spark (no streaming). To make it work you should use mapPartition. I don't know if something similar exist for spark streaming.
>>
>> my code for reference:
>>
>> def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
>> val lookupResult = ipLookups.performLookups(ip)
>> val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
>> val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
>> val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString
>> val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString
>> return List(countryName, city, latitude, longitude)
>> }
>> sc.addFile("/home/your_user/GeoLiteCity.dat")
>>
>> //load your data in my_data rdd
>>
>> my_data.mapPartitions { rows =>
>> val ipLookups = IpLookups(geoFile = Some(SparkFiles.get("GeoLiteCity.dat")))
>> rows.map { row => row ::: parseIP(row(3),ipLookups) }
>> }
>>
>> Le 23/02/2016 14:28, Zhun Shen a écrit :
>>> Hi all,
>>>
>>> Currently, I sent nginx log to Kafka then I want to use Spark Streaming to parse the log and enrich the IP info with geoip libs from Maxmind.
>>>
>>> I found this one <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>, but spark streaming throw error and told that the lib was not Serializable.
>>>
>>> Does anyone there way to process the IP info in Spark Streaming? Many thanks.
>>>
>>
>
Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming
Posted by romain sagean <ro...@hupi.fr>.
I realize I forgot the sbt part
resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.3.0",
"com.snowplowanalytics" %% "scala-maxmind-iplookups" % "0.2.0"
)
otherwise, to process streaming log I use logstash with kafka as input.
You can set kafka as output if you need to do some extra calculation
with spark.
Le 23/02/2016 15:07, Romain Sagean a écrit :
> Hi,
> I use maxmind geoip with spark (no streaming). To make it work you
> should use mapPartition. I don't know if something similar exist for
> spark streaming.
>
> my code for reference:
>
> def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
> val lookupResult = ipLookups.performLookups(ip)
> val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
> val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
> val latitude =
> (lookupResult._1).map(_.latitude).getOrElse(None).toString
> val longitude =
> (lookupResult._1).map(_.longitude).getOrElse(None).toString
> return List(countryName, city, latitude, longitude)
> }
> sc.addFile("/home/your_user/GeoLiteCity.dat")
>
> //load your data in my_data rdd
>
> my_data.mapPartitions { rows =>
> val ipLookups = IpLookups(geoFile =
> Some(SparkFiles.get("GeoLiteCity.dat")))
> rows.map { row => row ::: parseIP(row(3),ipLookups) }
> }
>
> Le 23/02/2016 14:28, Zhun Shen a écrit :
>> Hi all,
>>
>> Currently, I sent nginx log to Kafka then I want to use Spark
>> Streaming to parse the log and enrich the IP info with geoip libs
>> from Maxmind.
>>
>> I found this one
>> https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark
>> streaming throw error and told that the lib was not Serializable.
>>
>> Does anyone there way to process the IP info in Spark Streaming? Many
>> thanks.
>>
>