You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Abel Coronado Iruegas <ac...@gmail.com> on 2014/09/15 20:30:31 UTC

Example of Geoprocessing with Spark

Here an example of a working code that takes a csv with lat lon points and
intersects with polygons of municipalities of Mexico, generating a new
version of the file with new attributes.

Do you think that could be improved?

Thanks.

The Code:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.geoscript.feature._
import org.geoscript.geometry._
import org.geoscript.geometry.builder._
import com.vividsolutions.jts._
import org.geoscript.layer.Shapefile
import org.geotools.feature.FeatureCollection
import java.text._
import java.util._

object SimpleApp {
        def main(args: Array[String]){
                val conf = new SparkConf().setAppName("Csv Clipper")
                val sc = new SparkContext(conf)
                val csvPath = "hdfs://x01/user/acoronado/mov/movilidad.csv"
//70 Millions of rows
                val csv = sc.textFile(csvPath)
                val clipPoints = csv.map({line: String =>
                                               val Array(usuario, lat, lon,
date) = line.split(",").map(_.trim)
                                               val punto =
Point(lon.toDouble,lat.toDouble)
                                               val existe =
geoData.get.filter(f => f.geometry intersects punto) // Geospatial operation
                                               var cve_est = "0"
                                               var cve_mun = "0"
                                               var time = "0"
                                               if(!existe.isEmpty){
                                                  val f = existe.take(1)
                                                  val ff = f.toList(0)
                                                  cve_est =
ff.getAttribute(1).toString //State Code
                                                  cve_mun =
ff.getAttribute(2).toString  // Municipality Code
                                                  time = (new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
"+0000")).getTime().toString()
                                               }

 line+","+time+","+cve_est+","+cve_mun
                                           })

clipPoints.coalesce(1,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/mov_all.csv")
                println("Spark Clip Exito!!!")
        }
        object geoData {
            private val estatal =
Shapefile("/geoData/MunicipiosLatLon.shp") //This directory exist in all
the nodes.
            private val estatalColl = estatal.getFeatures
            def
get:FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
= estatalColl
        }
}

Re: Example of Geoprocessing with Spark

Posted by Abel Coronado Iruegas <ac...@gmail.com>.
Thanks, Evan and Andy:

Here a very functional version, i need to improve the syntax, but this works
very well, the initial version takes around 36 hours in a 9 machines with 8
cores, and this version takes 36 minutes in a cluster with 7 machines with 8
cores :

object SimpleApp {
        def main(args: Array[String]){
                val conf = new SparkConf().setAppName("Csv Clipper")
                val sc = new SparkContext(conf)
                val csvPath = "hdfs://m01/user/acoronado/mov/movilidad.csv"
                val csv = sc.textFile(csvPath)
                csv.cache()
                val clipPoints = csv.map({line: String =>
                                              val Array(usuario, lat, lon,
date) = line.split(",").map(_.trim)
                                              val geometryFactory =
JTSFactoryFinder.getGeometryFactory();
                                              val reader = new
WKTReader(geometryFactory);
                                              val point = reader.read("POINT
("+lon+" "+ lat + ")" )
                                              val envelope =
point.getEnvelopeInternal
                                              val internal =
geoDataMun.get(envelope)
                                              val (cve_est, cve_mun) =
internal match {
                                                case l => {
                                                                  val existe
= l.find( f => f match { case (g:Geometry,e:String,m:String) =>
g.intersects(point) case _ => false} )
                                                                  existe
match {  
                                                                                 
case Some(t)  => t match { case (g:Geometry,e:String,m:String) => (e,m) case
_ => ("0","0")}
                                                                                 
case None => ("0", "0")
                                                                               
}
                                                                }
                                                case _ => ("0", "0")
                                              }
                                              val time = try {(new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
"+0000")).getTime().toString()} catch {case e: Exception => "0"}

                                              
line+","+time+","+cve_est+","+cve_mun
                                })
               
clipPoints.coalesce(5,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/resultados_movilidad_5_parts.csv")
        }
        object geoDataMun {
                      var spatialIndex = new STRtree()
                      val path = new
URL("file:////geoData/MunicipiosLatLon.shp")
                      val store = FileDataStoreFinder.getDataStore(path)
                      val source = store.getFeatureSource();
                      val features = source.getFeatures();
                      val  it = features.features();
                      while(it.hasNext){
                        val feature = it.next()
                        val geom  =  feature.getDefaultGeometry
                        if (geom != null) {
                          val geomClass = geom match {   case g2: Geometry
=> g2  case _ => throw new ClassCastException }
                          val env = geomClass.getEnvelopeInternal();
                          if (!env.isNull) {
                            spatialIndex.insert(env,
(geomClass,feature.getAttribute(1),feature.getAttribute(2)));
                          }
                        }
                      }
                      def get(env:Envelope) =
spatialIndex.query(env).asScala
                }

}




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Example-of-Geoprocessing-with-Spark-tp14274p14752.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Example of Geoprocessing with Spark

Posted by andy petrella <an...@gmail.com>.
It's probably sloooow as you say because it's actually also doing the map
phase that will do the RTree search and so on, and only then saving to hdfs
on 60 partition. If you want to see the time spent in saving to hdfs, you
could do a count for instance before saving. Also saving from 60 partition
might be overkill so what you can do is first recoalescing to the number of
physical nodes that you have (without shuffling).

On the other hand, I don't know if you're running this in a cluster but
geoDataMun looks rather heavy to serialize so it would be preferable to
broadcast it once (since it won't change).

Also, it might only be a syntax improvement but the construction of (cve_est,
cve_mun) is rather long and seems that it can be replaced by these 3 lines
only:

>
>
>
> *val (cve_est, cve_mun) =   internal collectFirst {    case
> (g:Geometry,e:String,m:String) if g.intersects(point) => (e, m)  }
> getOrElse ("0", "0")*


HTH

aℕdy ℙetrella
about.me/noootsab
[image: aℕdy ℙetrella on about.me]

<http://about.me/noootsab>

On Sat, Sep 20, 2014 at 5:50 AM, Abel Coronado Iruegas <
acoronadoiruegas@gmail.com> wrote:

> Hi Evan,
>
> here a improved version, thanks for your advice. But you know the last
> step,
> the SaveAsTextFile is very Sloooow, :(
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkConf
> import java.net.URL
> import java.text.SimpleDateFormat
> import com.vividsolutions.jts.geom._
> import com.vividsolutions.jts.index.strtree.STRtree
> import com.vividsolutions.jts.io._
> import org.geotools.data.FileDataStoreFinder
> import org.geotools.geometry.jts.{JTSFactoryFinder, ReferencedEnvelope}
> import org.opengis.feature.{simple, Feature, FeatureVisitor}
> import scala.collection.JavaConverters._
>
>
> object SimpleApp {
>         def main(args: Array[String]){
>                 val conf = new SparkConf().setAppName("Csv Clipper")
>                 val sc = new SparkContext(conf)
>                 val csvPath = "hdfs://m01/user/acoronado/mov/movilidad.csv"
>                 val csv = sc.textFile(csvPath)
>                 //csv.coalesce(60,true)
>                 csv.cache()
>                 val clipPoints = csv.map({line: String =>
>                                               val Array(usuario, lat, lon,
> date) = line.split(",").map(_.trim)
>                                               val geometryFactory =
> JTSFactoryFinder.getGeometryFactory();
>                                               val reader = new
> WKTReader(geometryFactory);
>                                               val point =
> reader.read("POINT
> ("+lon+" "+ lat + ")" )
>                                               val envelope =
> point.getEnvelopeInternal
>                                               val internal =
> geoDataMun.get(envelope)
>                                               val (cve_est, cve_mun) =
> internal match {
>                                                 case l => {
>                                                                   val
> existe
> = l.find( f => f match { case (g:Geometry,e:String,m:String) =>
> g.intersects(point) case _ => false} )
>                                                                   existe
> match {
>
> case Some(t)  => t match { case (g:Geometry,e:String,m:String) => (e,m)
> case
> _ => ("0","0")}
>
> case None => ("0", "0")
>
> }
>                                                                 }
>                                                 case _ => ("0", "0")
>                                               }
>                                               val time = try {(new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
> "+0000")).getTime().toString()} catch {case e: Exception => "0"}
>
>
> line+","+time+","+cve_est+","+cve_mun
>                                 })
>
>
> clipPoints.saveAsTextFile("hdfs://m01/user/acoronado/mov/resultados_movilidad_fast.csv")
>         }
>         object geoDataMun {
>                       var spatialIndex = new STRtree()
>                       val path = new
> URL("file:////geoData/MunicipiosLatLon.shp")
>                       val store = FileDataStoreFinder.getDataStore(path)
>                       val source = store.getFeatureSource();
>                       val features = source.getFeatures();
>                       val  it = features.features();
>                       while(it.hasNext){
>                         val feature = it.next()
>                         val geom  =  feature.getDefaultGeometry
>                         if (geom != null) {
>                           val geomClass = geom match {   case g2: Geometry
> => g2  case _ => throw new ClassCastException }
>                           val env = geomClass.getEnvelopeInternal();
>                           if (!env.isNull) {
>                             spatialIndex.insert(env,
> (geomClass,feature.getAttribute(1),feature.getAttribute(2)));
>                           }
>                         }
>                       }
>                       def get(env:Envelope) =
> spatialIndex.query(env).asScala
>                 }
>
> }
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Example-of-Geoprocessing-with-Spark-tp14274p14710.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Example of Geoprocessing with Spark

Posted by Abel Coronado Iruegas <ac...@gmail.com>.
Hi Evan,

here a improved version, thanks for your advice. But you know the last step,
the SaveAsTextFile is very Sloooow, :(

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import java.net.URL
import java.text.SimpleDateFormat
import com.vividsolutions.jts.geom._
import com.vividsolutions.jts.index.strtree.STRtree
import com.vividsolutions.jts.io._
import org.geotools.data.FileDataStoreFinder
import org.geotools.geometry.jts.{JTSFactoryFinder, ReferencedEnvelope}
import org.opengis.feature.{simple, Feature, FeatureVisitor}
import scala.collection.JavaConverters._


object SimpleApp {
        def main(args: Array[String]){
                val conf = new SparkConf().setAppName("Csv Clipper")
                val sc = new SparkContext(conf)
                val csvPath = "hdfs://m01/user/acoronado/mov/movilidad.csv"
                val csv = sc.textFile(csvPath)
                //csv.coalesce(60,true)
                csv.cache()
                val clipPoints = csv.map({line: String =>
                                              val Array(usuario, lat, lon,
date) = line.split(",").map(_.trim)
                                              val geometryFactory =
JTSFactoryFinder.getGeometryFactory();
                                              val reader = new
WKTReader(geometryFactory);
                                              val point = reader.read("POINT
("+lon+" "+ lat + ")" )
                                              val envelope =
point.getEnvelopeInternal
                                              val internal =
geoDataMun.get(envelope)
                                              val (cve_est, cve_mun) =
internal match {
                                                case l => {
                                                                  val existe
= l.find( f => f match { case (g:Geometry,e:String,m:String) =>
g.intersects(point) case _ => false} )
                                                                  existe
match {
                                                                                 
case Some(t)  => t match { case (g:Geometry,e:String,m:String) => (e,m) case
_ => ("0","0")}
                                                                                 
case None => ("0", "0")
                                                                               
}
                                                                }
                                                case _ => ("0", "0")
                                              }
                                              val time = try {(new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
"+0000")).getTime().toString()} catch {case e: Exception => "0"}

                                              
line+","+time+","+cve_est+","+cve_mun
                                })
               
clipPoints.saveAsTextFile("hdfs://m01/user/acoronado/mov/resultados_movilidad_fast.csv")
        }
        object geoDataMun {
                      var spatialIndex = new STRtree()
                      val path = new
URL("file:////geoData/MunicipiosLatLon.shp")
                      val store = FileDataStoreFinder.getDataStore(path)
                      val source = store.getFeatureSource();
                      val features = source.getFeatures();
                      val  it = features.features();
                      while(it.hasNext){
                        val feature = it.next()
                        val geom  =  feature.getDefaultGeometry
                        if (geom != null) {
                          val geomClass = geom match {   case g2: Geometry
=> g2  case _ => throw new ClassCastException }
                          val env = geomClass.getEnvelopeInternal();
                          if (!env.isNull) {
                            spatialIndex.insert(env,
(geomClass,feature.getAttribute(1),feature.getAttribute(2)));
                          }
                        }
                      }
                      def get(env:Envelope) =
spatialIndex.query(env).asScala
                }

}





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Example-of-Geoprocessing-with-Spark-tp14274p14710.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Example of Geoprocessing with Spark

Posted by Evan Chan <ve...@gmail.com>.
Hi Abel,

Pretty interesting.  May I ask how big is your point CSV dataset?

It seems you are relying on searching through the FeatureCollection of
polygons for which one intersects your point.  This is going to be
extremely slow.  I highly recommend using a SpatialIndex, such as the
many that exist in the JTS library itself, to speed things up.

Also, note that the geoscript library is not really maintained
anymore.  I forked it with the intention of maintaining it some more,
but I've decided this is not really a good direction.

On Thu, Sep 18, 2014 at 7:02 PM, Abel Coronado Iruegas
<ac...@gmail.com> wrote:
> Now i have a better version, but now the problem is that the saveAsTextFile
> do not finish the Job, in the hdfs repository only exist a partial temporary
> file, someone can tell me what is wrong:
>
> Thanks !!
>
> object SimpleApp {
>
>         def main(args: Array[String]){
>
>                 val conf = new SparkConf().setAppName("Csv Clipper")
>
>                 val sc = new SparkContext(conf)
>
>                 val csvPath =
> "hdfs://m01/user/acoronado/mov/movilidad_64mb.csv"
>
>                 val csv = sc.textFile(csvPath)
>
>                 csv.cache()
>
>                 val clipPoints = csv.map({line: String =>
>
>                                                val Array(usuario, lat, lon,
> date) = line.split(",").map(_.trim)
>
>                                                val punto =
> Point(lon.toDouble,lat.toDouble)
>
>                                                val internal =
> geoDataExternal.get.find(f => f.geometry intersects punto)
>
>                                                val (cve_est, cve_mun) =
> internal match {
>
>                                        case
> Some(f:org.geoscript.feature.Feature) => {
>
>                                                             val index =
> f.getAttribute(1).toString()
>
>                                                             val existe =
> geoDataMun.get(index).find(f => f.geometry intersects punto)
>
>                                                             existe match {
>
>
> case Some(f) => (f.getAttribute(1).toString, f.getAttribute(2).toString)
>
>
> case None => ("0", "0")
>
>                                                                           }
>
>                                                           }
>
>                                           case None => ("0", "0")
>
>                                         }
>
>                                         val time = try {(new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
> "+0000")).getTime().toString()} catch {case e: Exception => "0"}
>
>
> line+","+time+","+cve_est+","+cve_mun
>
>                                 })
>
>
> clipPoints.saveAsTextFile("hdfs://m01/user/acoronado/mov/resultados_movilidad_60.csv")
>
>                 println("Spark Clip Exito!!!")
>
>         }
>
>         object geoDataMun {
>
>           private val shp = Shapefile("/geoData/MunicipiosLatLon.shp")
>
>           val features = shp.getFeatures.toIterable
>
>       val result = scala.io.Source.fromFile("/geoData/indice_espacial.csv")
>
>         .getLines()
>
>         .toList map { line: String =>
>
>                                        val campos =
> line.split(",").map(_.trim)
>
>                                        val cve_edo = campos(0)
>
>                                        val cve_mun = campos(1)
>
>                                        val index = campos(2)
>
>
> scala.collection.immutable.List(index.toInt , (cve_edo,cve_mun))
>
>                                     }
>
>       val mapaIx = result.groupBy(x=>x(0)).mapValues(cves => cves.map(x =>
> x(1)))
>
>       def get(index:String) = {
>
>         features.filter(f =>
> mapaIx(index.toInt).contains((f.getAttribute(1).toString,f.getAttribute(2).toString)))
>
>       }
>
>         }
>
>     object geoDataExternal{
>
>       private val shp = Shapefile("/geoData/IndiceRecortado.shp")
>
>       val features = shp.getFeatures
>
>       def get:
> FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
> = features
>
>     }
>
> }
>
>
> the log of the driver is:
>
> 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkWorker@axaxaxa-cloudera-s05.xxxnetworks.com:44895] ->
> [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error
> [Association failed with
> [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [
>
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]
>
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942
>
> ]
>
> 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkWorker@axaxaxa-cloudera-s05.xxxnetworks.com:44895] ->
> [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error
> [Association failed with
> [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [
>
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]
>
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942
>
> ]
>
> 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkWorker@axaxaxa-cloudera-s05.xxxnetworks.com:44895] ->
> [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error
> [Association failed with
> [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [
>
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]
>
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942
>
>
>
>
> On Mon, Sep 15, 2014 at 1:30 PM, Abel Coronado Iruegas
> <ac...@gmail.com> wrote:
>>
>> Here an example of a working code that takes a csv with lat lon points and
>> intersects with polygons of municipalities of Mexico, generating a new
>> version of the file with new attributes.
>>
>> Do you think that could be improved?
>>
>> Thanks.
>>
>> The Code:
>>
>> import org.apache.spark.SparkContext
>> import org.apache.spark.SparkContext._
>> import org.apache.spark.SparkConf
>> import org.geoscript.feature._
>> import org.geoscript.geometry._
>> import org.geoscript.geometry.builder._
>> import com.vividsolutions.jts._
>> import org.geoscript.layer.Shapefile
>> import org.geotools.feature.FeatureCollection
>> import java.text._
>> import java.util._
>>
>> object SimpleApp {
>>         def main(args: Array[String]){
>>                 val conf = new SparkConf().setAppName("Csv Clipper")
>>                 val sc = new SparkContext(conf)
>>                 val csvPath =
>> "hdfs://x01/user/acoronado/mov/movilidad.csv" //70 Millions of rows
>>                 val csv = sc.textFile(csvPath)
>>                 val clipPoints = csv.map({line: String =>
>>                                                val Array(usuario, lat,
>> lon, date) = line.split(",").map(_.trim)
>>                                                val punto =
>> Point(lon.toDouble,lat.toDouble)
>>                                                val existe =
>> geoData.get.filter(f => f.geometry intersects punto) // Geospatial operation
>>                                                var cve_est = "0"
>>                                                var cve_mun = "0"
>>                                                var time = "0"
>>                                                if(!existe.isEmpty){
>>                                                   val f = existe.take(1)
>>                                                   val ff = f.toList(0)
>>                                                   cve_est =
>> ff.getAttribute(1).toString //State Code
>>                                                   cve_mun =
>> ff.getAttribute(2).toString  // Municipality Code
>>                                                   time = (new
>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
>> "+0000")).getTime().toString()
>>                                                }
>>
>> line+","+time+","+cve_est+","+cve_mun
>>                                            })
>>
>> clipPoints.coalesce(1,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/mov_all.csv")
>>                 println("Spark Clip Exito!!!")
>>         }
>>         object geoData {
>>             private val estatal =
>> Shapefile("/geoData/MunicipiosLatLon.shp") //This directory exist in all the
>> nodes.
>>             private val estatalColl = estatal.getFeatures
>>             def
>> get:FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
>> = estatalColl
>>         }
>> }
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Example of Geoprocessing with Spark

Posted by Abel Coronado Iruegas <ac...@gmail.com>.
Now i have a better version, but now the problem is that the saveAsTextFile
do not finish the Job, in the hdfs repository only exist a partial
temporary file, someone can tell me what is wrong:

Thanks !!

object SimpleApp {

        def main(args: Array[String]){

                val conf = new SparkConf().setAppName("Csv Clipper")

                val sc = new SparkContext(conf)

                val csvPath =
"hdfs://m01/user/acoronado/mov/movilidad_64mb.csv"

                val csv = sc.textFile(csvPath)

                csv.cache()

                val clipPoints = csv.map({line: String =>

                                               val Array(usuario, lat, lon,
date) = line.split(",").map(_.trim)

                                               val punto =
Point(lon.toDouble,lat.toDouble)

                                               val internal =
geoDataExternal.get.find(f => f.geometry intersects punto)

                                               val (cve_est, cve_mun) =
internal match {

                                       case
Some(f:org.geoscript.feature.Feature) => {

                                                            val index =
f.getAttribute(1).toString()

                                                            val existe =
geoDataMun.get(index).find(f => f.geometry intersects punto)

                                                            existe match {


case Some(f) => (f.getAttribute(1).toString, f.getAttribute(2).toString)


case None => ("0", "0")

                                                                          }

                                                          }

                                          case None => ("0", "0")

                                        }

                                        val time = try {(new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
"+0000")).getTime().toString()} catch {case e: Exception => "0"}


line+","+time+","+cve_est+","+cve_mun

                                })

                clipPoints.*saveAsTextFile*
("hdfs://m01/user/acoronado/mov/resultados_movilidad_60.csv")

                println("Spark Clip Exito!!!")

        }

        object geoDataMun {

          private val shp = Shapefile("/geoData/MunicipiosLatLon.shp")

          val features = shp.getFeatures.toIterable

      val result = scala.io.Source.fromFile("/geoData/indice_espacial.csv")

        .getLines()

        .toList map { line: String =>

                                       val campos =
line.split(",").map(_.trim)

                                       val cve_edo = campos(0)

                                       val cve_mun = campos(1)

                                       val index = campos(2)


scala.collection.immutable.List(index.toInt , (cve_edo,cve_mun))

                                    }

      val mapaIx = result.groupBy(x=>x(0)).mapValues(cves => cves.map(x =>
x(1)))

      def get(index:String) = {

        features.filter(f =>
mapaIx(index.toInt).contains((f.getAttribute(1).toString,f.getAttribute(2).toString)))

      }

        }

    object geoDataExternal{

      private val shp = Shapefile("/geoData/IndiceRecortado.shp")

      val features = shp.getFeatures

      def get:
FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
= features

    }

}


the log of the driver is:

14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://
sparkWorker@axaxaxa-cloudera-s05.xxxnetworks.com:44895] -> [akka.tcp://
sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error
[Association failed with [akka.tcp://
sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [

akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]

Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942

]

14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://
sparkWorker@axaxaxa-cloudera-s05.xxxnetworks.com:44895] -> [akka.tcp://
sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error
[Association failed with [akka.tcp://
sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [

akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]

Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942

]

14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://
sparkWorker@axaxaxa-cloudera-s05.xxxnetworks.com:44895] -> [akka.tcp://
sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error
[Association failed with [akka.tcp://
sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [

akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]

Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942



On Mon, Sep 15, 2014 at 1:30 PM, Abel Coronado Iruegas <
acoronadoiruegas@gmail.com> wrote:

> Here an example of a working code that takes a csv with lat lon points and
> intersects with polygons of municipalities of Mexico, generating a new
> version of the file with new attributes.
>
> Do you think that could be improved?
>
> Thanks.
>
> The Code:
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkConf
> import org.geoscript.feature._
> import org.geoscript.geometry._
> import org.geoscript.geometry.builder._
> import com.vividsolutions.jts._
> import org.geoscript.layer.Shapefile
> import org.geotools.feature.FeatureCollection
> import java.text._
> import java.util._
>
> object SimpleApp {
>         def main(args: Array[String]){
>                 val conf = new SparkConf().setAppName("Csv Clipper")
>                 val sc = new SparkContext(conf)
>                 val csvPath =
> "hdfs://x01/user/acoronado/mov/movilidad.csv" //70 Millions of rows
>                 val csv = sc.textFile(csvPath)
>                 val clipPoints = csv.map({line: String =>
>                                                val Array(usuario, lat,
> lon, date) = line.split(",").map(_.trim)
>                                                val punto =
> Point(lon.toDouble,lat.toDouble)
>                                                val existe =
> geoData.get.filter(f => f.geometry intersects punto) // Geospatial operation
>                                                var cve_est = "0"
>                                                var cve_mun = "0"
>                                                var time = "0"
>                                                if(!existe.isEmpty){
>                                                   val f = existe.take(1)
>                                                   val ff = f.toList(0)
>                                                   cve_est =
> ff.getAttribute(1).toString //State Code
>                                                   cve_mun =
> ff.getAttribute(2).toString  // Municipality Code
>                                                   time = (new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
> "+0000")).getTime().toString()
>                                                }
>
>  line+","+time+","+cve_est+","+cve_mun
>                                            })
>
> clipPoints.coalesce(1,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/mov_all.csv")
>                 println("Spark Clip Exito!!!")
>         }
>         object geoData {
>             private val estatal =
> Shapefile("/geoData/MunicipiosLatLon.shp") //This directory exist in all
> the nodes.
>             private val estatalColl = estatal.getFeatures
>             def
> get:FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
> = estatalColl
>         }
> }
>