You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Raza Rehman <ra...@gmail.com> on 2014/07/10 03:46:36 UTC

Map Function does not seem to be executing over RDD

Hello every one

I am having some problem with a simple Scala/ Spark Code in which I am
trying to replaces certain fields in a csv with their hashes

class DSV (var line:String="",fieldsList:Seq[Int], var
delimiter:String=",") extends Serializable {

        def hash(s:String):String={
                var md = MessageDigest.getInstance("sha")
                md.update(s.getBytes("UTF-8"))

                var digest = md.digest()

                val string:Option[String] = Option(digest).map(Hex.valueOf)

                println("Retuning "+string)
                string.getOrElse("")
        }

        def anonymizeFields(l:String):String ={
                l.split(delimiter,-1).zipWithIndex
                .map {
                        case (str, index) if( fieldsList.contains(index))
=>hash(str)
                        case other => other._1
                }.mkString(delimiter)
        }
}

I am calling the anonymize function like this but the anondata seems to be
the same as the original dsvData

var dsvData = sc.textFile(inputPath+inputFileName).map(
                        line=>(new DSV(line,List(1,2),  "\\|"))
                )

                println("Lines Processed="+dsvData.count())
                var anonData = dsvData.map(l=>l.anonymizeFields(l.line))

                println("DSVs Processed="+anonData.count())
                anonData.saveAsTextFile(outputPath+outputFileName)

I have tried the execution through shell as well but the problem persists.
The job does finish but the worker log shows the following error message

14/07/09 11:30:20 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@host:60593] -> [akka.tcp://sparkExecutor@host:51397]:
Error [Association failed with [akka.tcp://sparkExecutor@host:51397]] [

Regards
MRK

Re: Map Function does not seem to be executing over RDD

Posted by Yana Kadiyska <ya...@gmail.com>.
Does this line  println("Retuning "+string) from the hash function
print what you expect? If you're not seeing that output in the
executor log I'd also put some debug statements in "case other", since
your match in the "interesting" case is conditioned on if(
fieldsList.contains(index)) -- maybe that doesn't catch what you think
it should...if that's the case you can dump out the contents of
fieldsList within the "other" case (i.e. inside the map) and see
what's there...

On Wed, Jul 9, 2014 at 9:46 PM, Raza Rehman <ra...@gmail.com> wrote:
> Hello every one
>
> I am having some problem with a simple Scala/ Spark Code in which I am
> trying to replaces certain fields in a csv with their hashes
>
> class DSV (var line:String="",fieldsList:Seq[Int], var delimiter:String=",")
> extends Serializable {
>
>         def hash(s:String):String={
>                 var md = MessageDigest.getInstance("sha")
>                 md.update(s.getBytes("UTF-8"))
>
>                 var digest = md.digest()
>
>                 val string:Option[String] = Option(digest).map(Hex.valueOf)
>
>                 println("Retuning "+string)
>                 string.getOrElse("")
>         }
>
>         def anonymizeFields(l:String):String ={
>                 l.split(delimiter,-1).zipWithIndex
>                 .map {
>                         case (str, index) if( fieldsList.contains(index))
> =>hash(str)
>                         case other => other._1
>                 }.mkString(delimiter)
>         }
> }
>
> I am calling the anonymize function like this but the anondata seems to be
> the same as the original dsvData
>
> var dsvData = sc.textFile(inputPath+inputFileName).map(
>                         line=>(new DSV(line,List(1,2),  "\\|"))
>                 )
>
>                 println("Lines Processed="+dsvData.count())
>                 var anonData = dsvData.map(l=>l.anonymizeFields(l.line))
>
>                 println("DSVs Processed="+anonData.count())
>                 anonData.saveAsTextFile(outputPath+outputFileName)
>
> I have tried the execution through shell as well but the problem persists.
> The job does finish but the worker log shows the following error message
>
> 14/07/09 11:30:20 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkWorker@host:60593] ->
> [akka.tcp://sparkExecutor@host:51397]: Error [Association failed with
> [akka.tcp://sparkExecutor@host:51397]] [
>
> Regards
> MRK