You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ๏̯͡๏ <ÐΞ€ρ@Ҝ>, de...@gmail.com on 2015/08/06 08:34:22 UTC

Unable to persist RDD to HDFS

Code:
import java.text.SimpleDateFormat
import java.util.Calendar
import java.sql.Date
import org.apache.spark.storage.StorageLevel

def formatStringAsDate(dateStr: String) = new java.sql.Date(new
SimpleDateFormat("yyyy-MM-dd").parse(dateStr).getTime())

//(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)

case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String,
f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float,
f12: Integer, f13: Integer, f14: String)


val rowStructText =
sc.textFile("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00001.gz")

val summary  = rowStructText.filter(s => s.length != 1).map(s =>
s.split("\t")).map(
    {
    s =>
    Summary(formatStringAsDate(s(0)),
            s(1).replaceAll("\"", "").toLong,
            s(3).replaceAll("\"", "").toLong,
            s(4).replaceAll("\"", "").toInt,
            s(5).replaceAll("\"", ""),
            s(6).replaceAll("\"", "").toInt,
            formatStringAsDate(s(7)),
            formatStringAsDate(s(8)),
            s(9).replaceAll("\"", "").toInt,
            s(10).replaceAll("\"", "").toInt,
            s(11).replaceAll("\"", "").toFloat,
            s(12).replaceAll("\"", "").toInt,
            s(13).replaceAll("\"", "").toInt,
            s(14).replaceAll("\"", "")
        )
    }
)

summary.saveAsTextFile("sparkO")


Output:
import java.text.SimpleDateFormat import java.util.Calendar import
java.sql.Date import org.apache.spark.storage.StorageLevel
formatStringAsDate: (dateStr: String)java.sql.Date defined class Summary
rowStructText: org.apache.spark.rdd.RDD[String] =
/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00001.gz
MapPartitionsRDD[639] at textFile at <console>:305 summary:
org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[642] at map at
<console>:310 org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 147.0 failed 4 times, most recent failure: Lost
task 0.3 in stage 147.0 (TID 3396, datanode-6-3486.phx01.dev.ebayc3.com):
java.lang.NumberFormatException: For input string: "3g" at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580) at
java.lang.Integer.parseInt(Integer.java:615) at
scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at
scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$b7c842676ccaed446a4cace94f9ed$$$$C$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(<console>:318)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$b7c842676ccaed446a4cace94f9ed$$$$C$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(<console>:312)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1072)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at
org.apache.spark.scheduler.Task.run(Task.scala:64) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)



OR
summary.count throws same exception

Any suggestions ?

-- 
Deepak

Re: Unable to persist RDD to HDFS

Posted by Philip Weaver <ph...@gmail.com>.
This isn't really a Spark question. You're trying to parse a string to an
integer, but it contains an invalid character. The exception message
explains this.

On Wed, Aug 5, 2015 at 11:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:

> Code:
> import java.text.SimpleDateFormat
> import java.util.Calendar
> import java.sql.Date
> import org.apache.spark.storage.StorageLevel
>
> def formatStringAsDate(dateStr: String) = new java.sql.Date(new
> SimpleDateFormat("yyyy-MM-dd").parse(dateStr).getTime())
>
>
> //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
>
> case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String,
> f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float,
> f12: Integer, f13: Integer, f14: String)
>
>
> val rowStructText =
> sc.textFile("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00001.gz")
>
> val summary  = rowStructText.filter(s => s.length != 1).map(s =>
> s.split("\t")).map(
>     {
>     s =>
>     Summary(formatStringAsDate(s(0)),
>             s(1).replaceAll("\"", "").toLong,
>             s(3).replaceAll("\"", "").toLong,
>             s(4).replaceAll("\"", "").toInt,
>             s(5).replaceAll("\"", ""),
>             s(6).replaceAll("\"", "").toInt,
>             formatStringAsDate(s(7)),
>             formatStringAsDate(s(8)),
>             s(9).replaceAll("\"", "").toInt,
>             s(10).replaceAll("\"", "").toInt,
>             s(11).replaceAll("\"", "").toFloat,
>             s(12).replaceAll("\"", "").toInt,
>             s(13).replaceAll("\"", "").toInt,
>             s(14).replaceAll("\"", "")
>         )
>     }
> )
>
> summary.saveAsTextFile("sparkO")
>
>
> Output:
> import java.text.SimpleDateFormat import java.util.Calendar import
> java.sql.Date import org.apache.spark.storage.StorageLevel
> formatStringAsDate: (dateStr: String)java.sql.Date defined class Summary
> rowStructText: org.apache.spark.rdd.RDD[String] =
> /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00001.gz
> MapPartitionsRDD[639] at textFile at <console>:305 summary:
> org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[642] at map at
> <console>:310 org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 0 in stage 147.0 failed 4 times, most recent failure: Lost
> task 0.3 in stage 147.0 (TID 3396, datanode-6-3486.phx01.dev.ebayc3.com):
> java.lang.NumberFormatException: For input string: "3g" at
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:580) at
> java.lang.Integer.parseInt(Integer.java:615) at
> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at
> scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$b7c842676ccaed446a4cace94f9ed$$$$C$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(<console>:318)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$b7c842676ccaed446a4cace94f9ed$$$$C$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(<console>:312)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1072)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at
> org.apache.spark.scheduler.Task.run(Task.scala:64) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>
>
>
> OR
> summary.count throws same exception
>
> Any suggestions ?
>
> --
> Deepak
>
>