You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Anurag <an...@gmail.com> on 2014/04/08 22:36:04 UTC
reading custom input format in Spark
Hi,
I am able to read a custom input format in spark.
scala> val inputRead = sc.newAPIHadoopFile("hdfs://
127.0.0.1/user/cloudera/date_dataset/
",classOf[io.reader.PatternInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])
However, doing a
inputRead.count()
results in null pointer exception.
14/04/08 13:33:39 INFO FileInputFormat: Total input paths to process : 1
14/04/08 13:33:39 INFO SparkContext: Starting job: count at <console>:15
14/04/08 13:33:39 INFO DAGScheduler: Got job 8 (count at <console>:15) with
1 output partitions (allowLocal=false)
14/04/08 13:33:39 INFO DAGScheduler: Final stage: Stage 9 (count at
<console>:15)
14/04/08 13:33:39 INFO DAGScheduler: Parents of final stage: List()
14/04/08 13:33:39 INFO DAGScheduler: Missing parents: List()
14/04/08 13:33:39 INFO DAGScheduler: Submitting Stage 9 (NewHadoopRDD[19]
at newAPIHadoopFile at <console>:12), which has no missing parents
14/04/08 13:33:39 INFO DAGScheduler: Submitting 1 missing tasks from Stage
9 (NewHadoopRDD[19] at newAPIHadoopFile at <console>:12)
14/04/08 13:33:39 INFO TaskSchedulerImpl: Adding task set 9.0 with 1 tasks
14/04/08 13:33:39 INFO TaskSetManager: Starting task 9.0:0 as TID 8 on
executor localhost: localhost (PROCESS_LOCAL)
14/04/08 13:33:39 INFO TaskSetManager: Serialized task 9.0:0 as 1297 bytes
in 0 ms
14/04/08 13:33:39 INFO Executor: Running task ID 8
14/04/08 13:33:39 INFO BlockManager: Found block broadcast_5 locally
14/04/08 13:33:39 INFO NewHadoopRDD: Input split: hdfs://
127.0.0.1/user/cloudera/date_dataset/sample.txt:0+759
14/04/08 13:33:39 WARN TaskSetManager: Lost TID 8 (task 9.0:0)
14/04/08 13:33:39 WARN TaskSetManager: Loss was due to
java.lang.NullPointerException
java.lang.NullPointerException
at java.util.regex.Pattern.<init>(Pattern.java:1132)
at java.util.regex.Pattern.compile(Pattern.java:823)
at io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
14/04/08 13:33:39 ERROR TaskSetManager: Task 9.0:0 failed 1 times; aborting
job
14/04/08 13:33:39 INFO DAGScheduler: Failed to run count at <console>:15
14/04/08 13:33:39 INFO TaskSchedulerImpl: Remove TaskSet 9.0 from pool
14/04/08 13:33:39 ERROR Executor: Exception in task ID 8
java.lang.NullPointerException
at java.util.regex.Pattern.<init>(Pattern.java:1132)
at java.util.regex.Pattern.compile(Pattern.java:823)
at io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
org.apache.spark.SparkException: Job aborted: Task 9.0:0 failed 1 times
(most recent failure: Exception failure: java.lang.NullPointerException)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
any idea what might be happening here?
-anurag
--
Twitter: @anuragphadke (https://twitter.com/#!/anuragphadke)
Re: reading custom input format in Spark
Posted by Nick Pentreath <ni...@gmail.com>.
Seems like you need to initialise a regex pattern for that inputformat. How
is this done? Perhaps via a config option?
In which case you need to first create a hadoop configuration, set the
appropriate config option for the regex, and pass that into
newAPIHadoopFile.
On Tue, Apr 8, 2014 at 10:36 PM, Anurag <an...@gmail.com> wrote:
> Hi,
> I am able to read a custom input format in spark.
> scala> val inputRead = sc.newAPIHadoopFile("hdfs://
> 127.0.0.1/user/cloudera/date_dataset/
>
> ",classOf[io.reader.PatternInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])
>
> However, doing a
> inputRead.count()
> results in null pointer exception.
> 14/04/08 13:33:39 INFO FileInputFormat: Total input paths to process : 1
> 14/04/08 13:33:39 INFO SparkContext: Starting job: count at <console>:15
> 14/04/08 13:33:39 INFO DAGScheduler: Got job 8 (count at <console>:15) with
> 1 output partitions (allowLocal=false)
> 14/04/08 13:33:39 INFO DAGScheduler: Final stage: Stage 9 (count at
> <console>:15)
> 14/04/08 13:33:39 INFO DAGScheduler: Parents of final stage: List()
> 14/04/08 13:33:39 INFO DAGScheduler: Missing parents: List()
> 14/04/08 13:33:39 INFO DAGScheduler: Submitting Stage 9 (NewHadoopRDD[19]
> at newAPIHadoopFile at <console>:12), which has no missing parents
> 14/04/08 13:33:39 INFO DAGScheduler: Submitting 1 missing tasks from Stage
> 9 (NewHadoopRDD[19] at newAPIHadoopFile at <console>:12)
> 14/04/08 13:33:39 INFO TaskSchedulerImpl: Adding task set 9.0 with 1 tasks
> 14/04/08 13:33:39 INFO TaskSetManager: Starting task 9.0:0 as TID 8 on
> executor localhost: localhost (PROCESS_LOCAL)
> 14/04/08 13:33:39 INFO TaskSetManager: Serialized task 9.0:0 as 1297 bytes
> in 0 ms
> 14/04/08 13:33:39 INFO Executor: Running task ID 8
> 14/04/08 13:33:39 INFO BlockManager: Found block broadcast_5 locally
> 14/04/08 13:33:39 INFO NewHadoopRDD: Input split: hdfs://
> 127.0.0.1/user/cloudera/date_dataset/sample.txt:0+759
> 14/04/08 13:33:39 WARN TaskSetManager: Lost TID 8 (task 9.0:0)
> 14/04/08 13:33:39 WARN TaskSetManager: Loss was due to
> java.lang.NullPointerException
> java.lang.NullPointerException
> at java.util.regex.Pattern.<init>(Pattern.java:1132)
> at java.util.regex.Pattern.compile(Pattern.java:823)
> at
> io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> at org.apache.spark.scheduler.Task.run(Task.scala:53)
> at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> 14/04/08 13:33:39 ERROR TaskSetManager: Task 9.0:0 failed 1 times; aborting
> job
> 14/04/08 13:33:39 INFO DAGScheduler: Failed to run count at <console>:15
> 14/04/08 13:33:39 INFO TaskSchedulerImpl: Remove TaskSet 9.0 from pool
> 14/04/08 13:33:39 ERROR Executor: Exception in task ID 8
> java.lang.NullPointerException
> at java.util.regex.Pattern.<init>(Pattern.java:1132)
> at java.util.regex.Pattern.compile(Pattern.java:823)
> at
> io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> at org.apache.spark.scheduler.Task.run(Task.scala:53)
> at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> org.apache.spark.SparkException: Job aborted: Task 9.0:0 failed 1 times
> (most recent failure: Exception failure: java.lang.NullPointerException)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at scala.Option.foreach(Option.scala:236)
> at
>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> any idea what might be happening here?
>
> -anurag
>
>
>
> --
> Twitter: @anuragphadke (https://twitter.com/#!/anuragphadke)
>
Re: reading custom input format in Spark
Posted by Andrew Ash <an...@andrewash.com>.
Anurag,
There is another method called newAPIHadoopRDD that takes in a
Configuration object rather than a path. Give that a shot?
https://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkContext
On Tue, Apr 8, 2014 at 1:47 PM, Anurag <an...@gmail.com> wrote:
> andrew - yes, i am using the PatternInputFormat from the blog post you
> referenced.
> I know how to set the pattern in configuration while writing a MR job, how
> do i do that from a spark shell?
>
> -anurag
>
>
>
> On Tue, Apr 8, 2014 at 1:41 PM, Andrew Ash <an...@andrewash.com> wrote:
>
> > Are you using the PatternInputFormat from this blog post?
> >
> >
> >
> https://hadoopi.wordpress.com/2013/05/31/custom-recordreader-processing-string-pattern-delimited-records/
> >
> > If so you need to set the pattern in the configuration before attempting
> to
> > read data with that InputFormat:
> >
> > String regex = "^[A-Za-z]{3},\\s\\d{2}\\s[A-Za-z]{3}.*";
> > Configuration conf = new Configuration(true);
> > conf.set("record.delimiter.regex", regex);
> >
> >
> > On Tue, Apr 8, 2014 at 1:36 PM, Anurag <an...@gmail.com> wrote:
> >
> > > Hi,
> > > I am able to read a custom input format in spark.
> > > scala> val inputRead = sc.newAPIHadoopFile("hdfs://
> > > 127.0.0.1/user/cloudera/date_dataset/
> > >
> > >
> >
> ",classOf[io.reader.PatternInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])
> > >
> > > However, doing a
> > > inputRead.count()
> > > results in null pointer exception.
> > > 14/04/08 13:33:39 INFO FileInputFormat: Total input paths to process :
> 1
> > > 14/04/08 13:33:39 INFO SparkContext: Starting job: count at
> <console>:15
> > > 14/04/08 13:33:39 INFO DAGScheduler: Got job 8 (count at <console>:15)
> > with
> > > 1 output partitions (allowLocal=false)
> > > 14/04/08 13:33:39 INFO DAGScheduler: Final stage: Stage 9 (count at
> > > <console>:15)
> > > 14/04/08 13:33:39 INFO DAGScheduler: Parents of final stage: List()
> > > 14/04/08 13:33:39 INFO DAGScheduler: Missing parents: List()
> > > 14/04/08 13:33:39 INFO DAGScheduler: Submitting Stage 9
> (NewHadoopRDD[19]
> > > at newAPIHadoopFile at <console>:12), which has no missing parents
> > > 14/04/08 13:33:39 INFO DAGScheduler: Submitting 1 missing tasks from
> > Stage
> > > 9 (NewHadoopRDD[19] at newAPIHadoopFile at <console>:12)
> > > 14/04/08 13:33:39 INFO TaskSchedulerImpl: Adding task set 9.0 with 1
> > tasks
> > > 14/04/08 13:33:39 INFO TaskSetManager: Starting task 9.0:0 as TID 8 on
> > > executor localhost: localhost (PROCESS_LOCAL)
> > > 14/04/08 13:33:39 INFO TaskSetManager: Serialized task 9.0:0 as 1297
> > bytes
> > > in 0 ms
> > > 14/04/08 13:33:39 INFO Executor: Running task ID 8
> > > 14/04/08 13:33:39 INFO BlockManager: Found block broadcast_5 locally
> > > 14/04/08 13:33:39 INFO NewHadoopRDD: Input split: hdfs://
> > > 127.0.0.1/user/cloudera/date_dataset/sample.txt:0+759
> > > 14/04/08 13:33:39 WARN TaskSetManager: Lost TID 8 (task 9.0:0)
> > > 14/04/08 13:33:39 WARN TaskSetManager: Loss was due to
> > > java.lang.NullPointerException
> > > java.lang.NullPointerException
> > > at java.util.regex.Pattern.<init>(Pattern.java:1132)
> > > at java.util.regex.Pattern.compile(Pattern.java:823)
> > > at
> > > io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
> > > at
> > > org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
> > > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
> > > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
> > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> > > at
> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> > > at org.apache.spark.scheduler.Task.run(Task.scala:53)
> > > at
> > >
> > >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> > > at
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> > > at
> > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> > > at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > > at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > > at java.lang.Thread.run(Thread.java:662)
> > > 14/04/08 13:33:39 ERROR TaskSetManager: Task 9.0:0 failed 1 times;
> > aborting
> > > job
> > > 14/04/08 13:33:39 INFO DAGScheduler: Failed to run count at
> <console>:15
> > > 14/04/08 13:33:39 INFO TaskSchedulerImpl: Remove TaskSet 9.0 from pool
> > > 14/04/08 13:33:39 ERROR Executor: Exception in task ID 8
> > > java.lang.NullPointerException
> > > at java.util.regex.Pattern.<init>(Pattern.java:1132)
> > > at java.util.regex.Pattern.compile(Pattern.java:823)
> > > at
> > > io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
> > > at
> > > org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
> > > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
> > > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
> > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> > > at
> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> > > at org.apache.spark.scheduler.Task.run(Task.scala:53)
> > > at
> > >
> > >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> > > at
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> > > at
> > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> > > at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > > at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > > at java.lang.Thread.run(Thread.java:662)
> > > org.apache.spark.SparkException: Job aborted: Task 9.0:0 failed 1 times
> > > (most recent failure: Exception failure:
> java.lang.NullPointerException)
> > > at
> > >
> > >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> > > at
> > >
> > >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> > > at
> > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > > at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > > at org.apache.spark.scheduler.DAGScheduler.org
> > >
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> > > at
> > >
> > >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> > > at
> > >
> > >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> > > at scala.Option.foreach(Option.scala:236)
> > > at
> > >
> > >
> >
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
> > > at
> > >
> > >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> > > at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> > > at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> > > at
> > >
> > >
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> > > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > at
> > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > > at
> > >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > at
> > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > >
> > >
> > > any idea what might be happening here?
> > >
> > > -anurag
> > >
> > >
> > >
> > > --
> > > Twitter: @anuragphadke (https://twitter.com/#!/anuragphadke)
> > >
> >
>
>
>
> --
> Twitter: @anuragphadke (https://twitter.com/#!/anuragphadke)
>
Re: reading custom input format in Spark
Posted by Anurag <an...@gmail.com>.
andrew/nick,
thx for the input, got it to work:
sc.hadoopConfiguration.set("record.delimiter.regex",
"^[A-Za-z]{3},\\s\\d{2}\\s[A-Za-z]{3}.*")
:-)
-anurag
On Tue, Apr 8, 2014 at 1:47 PM, Anurag <an...@gmail.com> wrote:
> andrew - yes, i am using the PatternInputFormat from the blog post you
> referenced.
> I know how to set the pattern in configuration while writing a MR job, how
> do i do that from a spark shell?
>
> -anurag
>
>
>
> On Tue, Apr 8, 2014 at 1:41 PM, Andrew Ash <an...@andrewash.com> wrote:
>
>> Are you using the PatternInputFormat from this blog post?
>>
>>
>> https://hadoopi.wordpress.com/2013/05/31/custom-recordreader-processing-string-pattern-delimited-records/
>>
>> If so you need to set the pattern in the configuration before attempting
>> to
>> read data with that InputFormat:
>>
>> String regex = "^[A-Za-z]{3},\\s\\d{2}\\s[A-Za-z]{3}.*";
>> Configuration conf = new Configuration(true);
>> conf.set("record.delimiter.regex", regex);
>>
>>
>> On Tue, Apr 8, 2014 at 1:36 PM, Anurag <an...@gmail.com> wrote:
>>
>> > Hi,
>> > I am able to read a custom input format in spark.
>> > scala> val inputRead = sc.newAPIHadoopFile("hdfs://
>> > 127.0.0.1/user/cloudera/date_dataset/
>> >
>> >
>> ",classOf[io.reader.PatternInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])
>> >
>> > However, doing a
>> > inputRead.count()
>> > results in null pointer exception.
>> > 14/04/08 13:33:39 INFO FileInputFormat: Total input paths to process : 1
>> > 14/04/08 13:33:39 INFO SparkContext: Starting job: count at <console>:15
>> > 14/04/08 13:33:39 INFO DAGScheduler: Got job 8 (count at <console>:15)
>> with
>> > 1 output partitions (allowLocal=false)
>> > 14/04/08 13:33:39 INFO DAGScheduler: Final stage: Stage 9 (count at
>> > <console>:15)
>> > 14/04/08 13:33:39 INFO DAGScheduler: Parents of final stage: List()
>> > 14/04/08 13:33:39 INFO DAGScheduler: Missing parents: List()
>> > 14/04/08 13:33:39 INFO DAGScheduler: Submitting Stage 9
>> (NewHadoopRDD[19]
>> > at newAPIHadoopFile at <console>:12), which has no missing parents
>> > 14/04/08 13:33:39 INFO DAGScheduler: Submitting 1 missing tasks from
>> Stage
>> > 9 (NewHadoopRDD[19] at newAPIHadoopFile at <console>:12)
>> > 14/04/08 13:33:39 INFO TaskSchedulerImpl: Adding task set 9.0 with 1
>> tasks
>> > 14/04/08 13:33:39 INFO TaskSetManager: Starting task 9.0:0 as TID 8 on
>> > executor localhost: localhost (PROCESS_LOCAL)
>> > 14/04/08 13:33:39 INFO TaskSetManager: Serialized task 9.0:0 as 1297
>> bytes
>> > in 0 ms
>> > 14/04/08 13:33:39 INFO Executor: Running task ID 8
>> > 14/04/08 13:33:39 INFO BlockManager: Found block broadcast_5 locally
>> > 14/04/08 13:33:39 INFO NewHadoopRDD: Input split: hdfs://
>> > 127.0.0.1/user/cloudera/date_dataset/sample.txt:0+759
>> > 14/04/08 13:33:39 WARN TaskSetManager: Lost TID 8 (task 9.0:0)
>> > 14/04/08 13:33:39 WARN TaskSetManager: Loss was due to
>> > java.lang.NullPointerException
>> > java.lang.NullPointerException
>> > at java.util.regex.Pattern.<init>(Pattern.java:1132)
>> > at java.util.regex.Pattern.compile(Pattern.java:823)
>> > at
>> > io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
>> > at
>> > org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
>> > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
>> > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>> > at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>> > at org.apache.spark.scheduler.Task.run(Task.scala:53)
>> > at
>> >
>> >
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>> > at
>> >
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>> > at
>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>> > at
>> >
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> > at
>> >
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> > at java.lang.Thread.run(Thread.java:662)
>> > 14/04/08 13:33:39 ERROR TaskSetManager: Task 9.0:0 failed 1 times;
>> aborting
>> > job
>> > 14/04/08 13:33:39 INFO DAGScheduler: Failed to run count at <console>:15
>> > 14/04/08 13:33:39 INFO TaskSchedulerImpl: Remove TaskSet 9.0 from pool
>> > 14/04/08 13:33:39 ERROR Executor: Exception in task ID 8
>> > java.lang.NullPointerException
>> > at java.util.regex.Pattern.<init>(Pattern.java:1132)
>> > at java.util.regex.Pattern.compile(Pattern.java:823)
>> > at
>> > io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
>> > at
>> > org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
>> > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
>> > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>> > at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>> > at org.apache.spark.scheduler.Task.run(Task.scala:53)
>> > at
>> >
>> >
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>> > at
>> >
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>> > at
>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>> > at
>> >
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> > at
>> >
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> > at java.lang.Thread.run(Thread.java:662)
>> > org.apache.spark.SparkException: Job aborted: Task 9.0:0 failed 1 times
>> > (most recent failure: Exception failure: java.lang.NullPointerException)
>> > at
>> >
>> >
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> > at
>> >
>> >
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>> > at
>> >
>> >
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> > at org.apache.spark.scheduler.DAGScheduler.org
>> >
>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>> > at
>> >
>> >
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> > at
>> >
>> >
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> > at scala.Option.foreach(Option.scala:236)
>> > at
>> >
>> >
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>> > at
>> >
>> >
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> > at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> > at
>> >
>> >
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> > at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > at
>> >
>> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> > at
>> >
>> >
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>> >
>> > any idea what might be happening here?
>> >
>> > -anurag
>> >
>> >
>> >
>> > --
>> > Twitter: @anuragphadke (https://twitter.com/#!/anuragphadke)
>> >
>>
>
>
>
> --
> Twitter: @anuragphadke (https://twitter.com/#!/anuragphadke)
>
--
Twitter: @anuragphadke (https://twitter.com/#!/anuragphadke)
Re: reading custom input format in Spark
Posted by Anurag <an...@gmail.com>.
andrew - yes, i am using the PatternInputFormat from the blog post you
referenced.
I know how to set the pattern in configuration while writing a MR job, how
do i do that from a spark shell?
-anurag
On Tue, Apr 8, 2014 at 1:41 PM, Andrew Ash <an...@andrewash.com> wrote:
> Are you using the PatternInputFormat from this blog post?
>
>
> https://hadoopi.wordpress.com/2013/05/31/custom-recordreader-processing-string-pattern-delimited-records/
>
> If so you need to set the pattern in the configuration before attempting to
> read data with that InputFormat:
>
> String regex = "^[A-Za-z]{3},\\s\\d{2}\\s[A-Za-z]{3}.*";
> Configuration conf = new Configuration(true);
> conf.set("record.delimiter.regex", regex);
>
>
> On Tue, Apr 8, 2014 at 1:36 PM, Anurag <an...@gmail.com> wrote:
>
> > Hi,
> > I am able to read a custom input format in spark.
> > scala> val inputRead = sc.newAPIHadoopFile("hdfs://
> > 127.0.0.1/user/cloudera/date_dataset/
> >
> >
> ",classOf[io.reader.PatternInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])
> >
> > However, doing a
> > inputRead.count()
> > results in null pointer exception.
> > 14/04/08 13:33:39 INFO FileInputFormat: Total input paths to process : 1
> > 14/04/08 13:33:39 INFO SparkContext: Starting job: count at <console>:15
> > 14/04/08 13:33:39 INFO DAGScheduler: Got job 8 (count at <console>:15)
> with
> > 1 output partitions (allowLocal=false)
> > 14/04/08 13:33:39 INFO DAGScheduler: Final stage: Stage 9 (count at
> > <console>:15)
> > 14/04/08 13:33:39 INFO DAGScheduler: Parents of final stage: List()
> > 14/04/08 13:33:39 INFO DAGScheduler: Missing parents: List()
> > 14/04/08 13:33:39 INFO DAGScheduler: Submitting Stage 9 (NewHadoopRDD[19]
> > at newAPIHadoopFile at <console>:12), which has no missing parents
> > 14/04/08 13:33:39 INFO DAGScheduler: Submitting 1 missing tasks from
> Stage
> > 9 (NewHadoopRDD[19] at newAPIHadoopFile at <console>:12)
> > 14/04/08 13:33:39 INFO TaskSchedulerImpl: Adding task set 9.0 with 1
> tasks
> > 14/04/08 13:33:39 INFO TaskSetManager: Starting task 9.0:0 as TID 8 on
> > executor localhost: localhost (PROCESS_LOCAL)
> > 14/04/08 13:33:39 INFO TaskSetManager: Serialized task 9.0:0 as 1297
> bytes
> > in 0 ms
> > 14/04/08 13:33:39 INFO Executor: Running task ID 8
> > 14/04/08 13:33:39 INFO BlockManager: Found block broadcast_5 locally
> > 14/04/08 13:33:39 INFO NewHadoopRDD: Input split: hdfs://
> > 127.0.0.1/user/cloudera/date_dataset/sample.txt:0+759
> > 14/04/08 13:33:39 WARN TaskSetManager: Lost TID 8 (task 9.0:0)
> > 14/04/08 13:33:39 WARN TaskSetManager: Loss was due to
> > java.lang.NullPointerException
> > java.lang.NullPointerException
> > at java.util.regex.Pattern.<init>(Pattern.java:1132)
> > at java.util.regex.Pattern.compile(Pattern.java:823)
> > at
> > io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
> > at
> > org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
> > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
> > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> > at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> > at org.apache.spark.scheduler.Task.run(Task.scala:53)
> > at
> >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> > at
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> > at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > at java.lang.Thread.run(Thread.java:662)
> > 14/04/08 13:33:39 ERROR TaskSetManager: Task 9.0:0 failed 1 times;
> aborting
> > job
> > 14/04/08 13:33:39 INFO DAGScheduler: Failed to run count at <console>:15
> > 14/04/08 13:33:39 INFO TaskSchedulerImpl: Remove TaskSet 9.0 from pool
> > 14/04/08 13:33:39 ERROR Executor: Exception in task ID 8
> > java.lang.NullPointerException
> > at java.util.regex.Pattern.<init>(Pattern.java:1132)
> > at java.util.regex.Pattern.compile(Pattern.java:823)
> > at
> > io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
> > at
> > org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
> > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
> > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> > at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> > at org.apache.spark.scheduler.Task.run(Task.scala:53)
> > at
> >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> > at
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> > at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > at java.lang.Thread.run(Thread.java:662)
> > org.apache.spark.SparkException: Job aborted: Task 9.0:0 failed 1 times
> > (most recent failure: Exception failure: java.lang.NullPointerException)
> > at
> >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> > at
> >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> > at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > at org.apache.spark.scheduler.DAGScheduler.org
> > $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> > at
> >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> > at
> >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> > at scala.Option.foreach(Option.scala:236)
> > at
> >
> >
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
> > at
> >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> > at
> >
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> > at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >
> >
> > any idea what might be happening here?
> >
> > -anurag
> >
> >
> >
> > --
> > Twitter: @anuragphadke (https://twitter.com/#!/anuragphadke)
> >
>
--
Twitter: @anuragphadke (https://twitter.com/#!/anuragphadke)
Re: reading custom input format in Spark
Posted by Andrew Ash <an...@andrewash.com>.
Are you using the PatternInputFormat from this blog post?
https://hadoopi.wordpress.com/2013/05/31/custom-recordreader-processing-string-pattern-delimited-records/
If so you need to set the pattern in the configuration before attempting to
read data with that InputFormat:
String regex = "^[A-Za-z]{3},\\s\\d{2}\\s[A-Za-z]{3}.*";
Configuration conf = new Configuration(true);
conf.set("record.delimiter.regex", regex);
On Tue, Apr 8, 2014 at 1:36 PM, Anurag <an...@gmail.com> wrote:
> Hi,
> I am able to read a custom input format in spark.
> scala> val inputRead = sc.newAPIHadoopFile("hdfs://
> 127.0.0.1/user/cloudera/date_dataset/
>
> ",classOf[io.reader.PatternInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])
>
> However, doing a
> inputRead.count()
> results in null pointer exception.
> 14/04/08 13:33:39 INFO FileInputFormat: Total input paths to process : 1
> 14/04/08 13:33:39 INFO SparkContext: Starting job: count at <console>:15
> 14/04/08 13:33:39 INFO DAGScheduler: Got job 8 (count at <console>:15) with
> 1 output partitions (allowLocal=false)
> 14/04/08 13:33:39 INFO DAGScheduler: Final stage: Stage 9 (count at
> <console>:15)
> 14/04/08 13:33:39 INFO DAGScheduler: Parents of final stage: List()
> 14/04/08 13:33:39 INFO DAGScheduler: Missing parents: List()
> 14/04/08 13:33:39 INFO DAGScheduler: Submitting Stage 9 (NewHadoopRDD[19]
> at newAPIHadoopFile at <console>:12), which has no missing parents
> 14/04/08 13:33:39 INFO DAGScheduler: Submitting 1 missing tasks from Stage
> 9 (NewHadoopRDD[19] at newAPIHadoopFile at <console>:12)
> 14/04/08 13:33:39 INFO TaskSchedulerImpl: Adding task set 9.0 with 1 tasks
> 14/04/08 13:33:39 INFO TaskSetManager: Starting task 9.0:0 as TID 8 on
> executor localhost: localhost (PROCESS_LOCAL)
> 14/04/08 13:33:39 INFO TaskSetManager: Serialized task 9.0:0 as 1297 bytes
> in 0 ms
> 14/04/08 13:33:39 INFO Executor: Running task ID 8
> 14/04/08 13:33:39 INFO BlockManager: Found block broadcast_5 locally
> 14/04/08 13:33:39 INFO NewHadoopRDD: Input split: hdfs://
> 127.0.0.1/user/cloudera/date_dataset/sample.txt:0+759
> 14/04/08 13:33:39 WARN TaskSetManager: Lost TID 8 (task 9.0:0)
> 14/04/08 13:33:39 WARN TaskSetManager: Loss was due to
> java.lang.NullPointerException
> java.lang.NullPointerException
> at java.util.regex.Pattern.<init>(Pattern.java:1132)
> at java.util.regex.Pattern.compile(Pattern.java:823)
> at
> io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> at org.apache.spark.scheduler.Task.run(Task.scala:53)
> at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> 14/04/08 13:33:39 ERROR TaskSetManager: Task 9.0:0 failed 1 times; aborting
> job
> 14/04/08 13:33:39 INFO DAGScheduler: Failed to run count at <console>:15
> 14/04/08 13:33:39 INFO TaskSchedulerImpl: Remove TaskSet 9.0 from pool
> 14/04/08 13:33:39 ERROR Executor: Exception in task ID 8
> java.lang.NullPointerException
> at java.util.regex.Pattern.<init>(Pattern.java:1132)
> at java.util.regex.Pattern.compile(Pattern.java:823)
> at
> io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> at org.apache.spark.scheduler.Task.run(Task.scala:53)
> at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> org.apache.spark.SparkException: Job aborted: Task 9.0:0 failed 1 times
> (most recent failure: Exception failure: java.lang.NullPointerException)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at scala.Option.foreach(Option.scala:236)
> at
>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> any idea what might be happening here?
>
> -anurag
>
>
>
> --
> Twitter: @anuragphadke (https://twitter.com/#!/anuragphadke)
>