You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ted Yu <yu...@gmail.com> on 2014/09/24 18:29:59 UTC

Re: task getting stuck

Adding a subject.

bq.       at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(
ParquetFileReader.java:599)

Looks like there might be some issue reading the Parquet file.

Cheers

On Wed, Sep 24, 2014 at 9:10 AM, Jianshi Huang <ji...@gmail.com>
wrote:

> Hi Ted,
>
> See my previous reply to Debasish, all region servers are idle. I don't
> think it's caused by hotspotting.
>
> Besides, only 6 out of 3000 tasks were stuck, and their inputs are about
> only 80MB each.
>
> Jianshi
>
> On Wed, Sep 24, 2014 at 11:58 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> I was thinking along the same line.
>>
>> Jianshi:
>> See
>> http://hbase.apache.org/book.html#d0e6369
>>
>> On Wed, Sep 24, 2014 at 8:56 AM, Debasish Das <de...@gmail.com>
>> wrote:
>>
>>> HBase regionserver needs to be balanced....you might have some skewness
>>> in row keys and one regionserver is under pressure....try finding that key
>>> and replicate it using random salt
>>>
>>> On Wed, Sep 24, 2014 at 8:51 AM, Jianshi Huang <ji...@gmail.com>
>>> wrote:
>>>
>>>> Hi Ted,
>>>>
>>>> It converts RDD[Edge] to HBase rowkey and columns and insert them to
>>>> HBase (in batch).
>>>>
>>>> BTW, I found batched Put actually faster than generating HFiles...
>>>>
>>>>
>>>> Jianshi
>>>>
>>>> On Wed, Sep 24, 2014 at 11:49 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>>> bq.         at com.paypal.risk.rds.dragon.
>>>>> storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.
>>>>> apply(HbaseRDDBatch.scala:179)
>>>>>
>>>>> Can you reveal what HbaseRDDBatch.scala does ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Wed, Sep 24, 2014 at 8:46 AM, Jianshi Huang <
>>>>> jianshi.huang@gmail.com> wrote:
>>>>>
>>>>>> One of my big spark program always get stuck at 99% where a few tasks
>>>>>> never finishes.
>>>>>>
>>>>>> I debugged it by printing out thread stacktraces, and found there're
>>>>>> workers stuck at parquet.hadoop.ParquetFileReader.readNextRowGroup.
>>>>>>
>>>>>> Anyone had similar problem? I'm using Spark 1.1.0 built for HDP2.1.
>>>>>> The parquet files are generated by pig using latest parquet-pig-bundle
>>>>>> v1.6.0rc1.
>>>>>>
>>>>>> From Spark 1.1.0's pom.xml, Spark is using parquet v1.4.3, will this
>>>>>> be problematic?
>>>>>>
>>>>>> One of the weird behavior is that another program read and sort data
>>>>>> read from the same parquet files and it works fine. The only difference
>>>>>> seems the buggy program uses foreachPartition and the working program uses
>>>>>> map.
>>>>>>
>>>>>> Here's the full stacktrace:
>>>>>>
>>>>>> "Executor task launch worker-3"
>>>>>>    java.lang.Thread.State: RUNNABLE
>>>>>>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>>>         at
>>>>>> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:257)
>>>>>>         at
>>>>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>>>>>         at
>>>>>> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>>>>>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>>>>>         at
>>>>>> org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
>>>>>>         at
>>>>>> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
>>>>>>         at
>>>>>> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>>>>>>         at
>>>>>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
>>>>>>         at
>>>>>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
>>>>>>         at
>>>>>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)
>>>>>>         at
>>>>>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
>>>>>>         at
>>>>>> org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:173)
>>>>>>         at
>>>>>> org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:138)
>>>>>>         at
>>>>>> org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:683)
>>>>>>         at
>>>>>> org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:739)
>>>>>>         at
>>>>>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:796)
>>>>>>         at
>>>>>> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
>>>>>>         at java.io.DataInputStream.readFully(DataInputStream.java:195)
>>>>>>         at java.io.DataInputStream.readFully(DataInputStream.java:169)
>>>>>>         at
>>>>>> parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
>>>>>>         at
>>>>>> parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
>>>>>>         at
>>>>>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
>>>>>>         at
>>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
>>>>>>         at
>>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
>>>>>>         at
>>>>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:139)
>>>>>>         at
>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>         at
>>>>>> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
>>>>>>         at
>>>>>> scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
>>>>>>         at
>>>>>> scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:969)
>>>>>>         at
>>>>>> scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
>>>>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>         at
>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>         at
>>>>>> com.paypal.risk.rds.dragon.storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.apply(HbaseRDDBatch.scala:179)
>>>>>>         at
>>>>>> com.paypal.risk.rds.dragon.storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.apply(HbaseRDDBatch.scala:167)
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:767)
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:767)
>>>>>>         at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1103)
>>>>>>         at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1103)
>>>>>>         at
>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>         at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>         at java.lang.Thread.run(Thread.java:724)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jianshi Huang
>>>>>>
>>>>>> LinkedIn: jianshi
>>>>>> Twitter: @jshuang
>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jianshi Huang
>>>>
>>>> LinkedIn: jianshi
>>>> Twitter: @jshuang
>>>> Github & Blog: http://huangjs.github.com/
>>>>
>>>
>>>
>>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>

Re: task getting stuck

Posted by Debasish Das <de...@gmail.com>.
First test would be if you can write parquet files fine on HDFS from your
Spark job fine...If that also get's stuck then there is something with the
logic...If parquet files are dumped fine and u can load them on HBase then
there is something going on with Spark-HBase interaction

On Wed, Sep 24, 2014 at 9:41 AM, Debasish Das <de...@gmail.com>
wrote:

> spark SQL reads parquet file fine...did you follow one of these to
> read/write parquet from spark ?
>
> http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/
>
> On Wed, Sep 24, 2014 at 9:29 AM, Ted Yu <yu...@gmail.com> wrote:
>
>> Adding a subject.
>>
>> bq.       at parquet.hadoop.ParquetFileReader$
>> ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
>>
>> Looks like there might be some issue reading the Parquet file.
>>
>> Cheers
>>
>> On Wed, Sep 24, 2014 at 9:10 AM, Jianshi Huang <ji...@gmail.com>
>> wrote:
>>
>>> Hi Ted,
>>>
>>> See my previous reply to Debasish, all region servers are idle. I don't
>>> think it's caused by hotspotting.
>>>
>>> Besides, only 6 out of 3000 tasks were stuck, and their inputs are about
>>> only 80MB each.
>>>
>>> Jianshi
>>>
>>> On Wed, Sep 24, 2014 at 11:58 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> I was thinking along the same line.
>>>>
>>>> Jianshi:
>>>> See
>>>> http://hbase.apache.org/book.html#d0e6369
>>>>
>>>> On Wed, Sep 24, 2014 at 8:56 AM, Debasish Das <debasish.das83@gmail.com
>>>> > wrote:
>>>>
>>>>> HBase regionserver needs to be balanced....you might have some
>>>>> skewness in row keys and one regionserver is under pressure....try finding
>>>>> that key and replicate it using random salt
>>>>>
>>>>> On Wed, Sep 24, 2014 at 8:51 AM, Jianshi Huang <
>>>>> jianshi.huang@gmail.com> wrote:
>>>>>
>>>>>> Hi Ted,
>>>>>>
>>>>>> It converts RDD[Edge] to HBase rowkey and columns and insert them to
>>>>>> HBase (in batch).
>>>>>>
>>>>>> BTW, I found batched Put actually faster than generating HFiles...
>>>>>>
>>>>>>
>>>>>> Jianshi
>>>>>>
>>>>>> On Wed, Sep 24, 2014 at 11:49 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>
>>>>>>> bq.         at com.paypal.risk.rds.dragon.
>>>>>>> storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.
>>>>>>> apply(HbaseRDDBatch.scala:179)
>>>>>>>
>>>>>>> Can you reveal what HbaseRDDBatch.scala does ?
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>> On Wed, Sep 24, 2014 at 8:46 AM, Jianshi Huang <
>>>>>>> jianshi.huang@gmail.com> wrote:
>>>>>>>
>>>>>>>> One of my big spark program always get stuck at 99% where a few
>>>>>>>> tasks never finishes.
>>>>>>>>
>>>>>>>> I debugged it by printing out thread stacktraces, and found
>>>>>>>> there're workers stuck at parquet.hadoop.ParquetFileReader.readNextRowGroup.
>>>>>>>>
>>>>>>>> Anyone had similar problem? I'm using Spark 1.1.0 built for HDP2.1.
>>>>>>>> The parquet files are generated by pig using latest parquet-pig-bundle
>>>>>>>> v1.6.0rc1.
>>>>>>>>
>>>>>>>> From Spark 1.1.0's pom.xml, Spark is using parquet v1.4.3, will
>>>>>>>> this be problematic?
>>>>>>>>
>>>>>>>> One of the weird behavior is that another program read and sort
>>>>>>>> data read from the same parquet files and it works fine. The only
>>>>>>>> difference seems the buggy program uses foreachPartition and the working
>>>>>>>> program uses map.
>>>>>>>>
>>>>>>>> Here's the full stacktrace:
>>>>>>>>
>>>>>>>> "Executor task launch worker-3"
>>>>>>>>    java.lang.Thread.State: RUNNABLE
>>>>>>>>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>>>>>         at
>>>>>>>> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:257)
>>>>>>>>         at
>>>>>>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>>>>>>>         at
>>>>>>>> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>>>>>>>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:173)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:138)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:683)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:739)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:796)
>>>>>>>>         at
>>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
>>>>>>>>         at
>>>>>>>> java.io.DataInputStream.readFully(DataInputStream.java:195)
>>>>>>>>         at
>>>>>>>> java.io.DataInputStream.readFully(DataInputStream.java:169)
>>>>>>>>         at
>>>>>>>> parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
>>>>>>>>         at
>>>>>>>> parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
>>>>>>>>         at
>>>>>>>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
>>>>>>>>         at
>>>>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
>>>>>>>>         at
>>>>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
>>>>>>>>         at
>>>>>>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:139)
>>>>>>>>         at
>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>>         at
>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>         at
>>>>>>>> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>>>>>>>         at
>>>>>>>> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>>>>>>>         at
>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>         at
>>>>>>>> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
>>>>>>>>         at
>>>>>>>> scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
>>>>>>>>         at
>>>>>>>> scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:969)
>>>>>>>>         at
>>>>>>>> scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
>>>>>>>>         at
>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>         at
>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>         at
>>>>>>>> com.paypal.risk.rds.dragon.storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.apply(HbaseRDDBatch.scala:179)
>>>>>>>>         at
>>>>>>>> com.paypal.risk.rds.dragon.storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.apply(HbaseRDDBatch.scala:167)
>>>>>>>>         at
>>>>>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:767)
>>>>>>>>         at
>>>>>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:767)
>>>>>>>>         at
>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1103)
>>>>>>>>         at
>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1103)
>>>>>>>>         at
>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>>         at
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>         at java.lang.Thread.run(Thread.java:724)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Jianshi Huang
>>>>>>>>
>>>>>>>> LinkedIn: jianshi
>>>>>>>> Twitter: @jshuang
>>>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jianshi Huang
>>>>>>
>>>>>> LinkedIn: jianshi
>>>>>> Twitter: @jshuang
>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>

Re: task getting stuck

Posted by Debasish Das <de...@gmail.com>.
spark SQL reads parquet file fine...did you follow one of these to
read/write parquet from spark ?

http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/

On Wed, Sep 24, 2014 at 9:29 AM, Ted Yu <yu...@gmail.com> wrote:

> Adding a subject.
>
> bq.       at parquet.hadoop.ParquetFileReader$
> ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
>
> Looks like there might be some issue reading the Parquet file.
>
> Cheers
>
> On Wed, Sep 24, 2014 at 9:10 AM, Jianshi Huang <ji...@gmail.com>
> wrote:
>
>> Hi Ted,
>>
>> See my previous reply to Debasish, all region servers are idle. I don't
>> think it's caused by hotspotting.
>>
>> Besides, only 6 out of 3000 tasks were stuck, and their inputs are about
>> only 80MB each.
>>
>> Jianshi
>>
>> On Wed, Sep 24, 2014 at 11:58 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> I was thinking along the same line.
>>>
>>> Jianshi:
>>> See
>>> http://hbase.apache.org/book.html#d0e6369
>>>
>>> On Wed, Sep 24, 2014 at 8:56 AM, Debasish Das <de...@gmail.com>
>>> wrote:
>>>
>>>> HBase regionserver needs to be balanced....you might have some skewness
>>>> in row keys and one regionserver is under pressure....try finding that key
>>>> and replicate it using random salt
>>>>
>>>> On Wed, Sep 24, 2014 at 8:51 AM, Jianshi Huang <jianshi.huang@gmail.com
>>>> > wrote:
>>>>
>>>>> Hi Ted,
>>>>>
>>>>> It converts RDD[Edge] to HBase rowkey and columns and insert them to
>>>>> HBase (in batch).
>>>>>
>>>>> BTW, I found batched Put actually faster than generating HFiles...
>>>>>
>>>>>
>>>>> Jianshi
>>>>>
>>>>> On Wed, Sep 24, 2014 at 11:49 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>
>>>>>> bq.         at com.paypal.risk.rds.dragon.
>>>>>> storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.
>>>>>> apply(HbaseRDDBatch.scala:179)
>>>>>>
>>>>>> Can you reveal what HbaseRDDBatch.scala does ?
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Wed, Sep 24, 2014 at 8:46 AM, Jianshi Huang <
>>>>>> jianshi.huang@gmail.com> wrote:
>>>>>>
>>>>>>> One of my big spark program always get stuck at 99% where a few
>>>>>>> tasks never finishes.
>>>>>>>
>>>>>>> I debugged it by printing out thread stacktraces, and found there're
>>>>>>> workers stuck at parquet.hadoop.ParquetFileReader.readNextRowGroup.
>>>>>>>
>>>>>>> Anyone had similar problem? I'm using Spark 1.1.0 built for HDP2.1.
>>>>>>> The parquet files are generated by pig using latest parquet-pig-bundle
>>>>>>> v1.6.0rc1.
>>>>>>>
>>>>>>> From Spark 1.1.0's pom.xml, Spark is using parquet v1.4.3, will this
>>>>>>> be problematic?
>>>>>>>
>>>>>>> One of the weird behavior is that another program read and sort data
>>>>>>> read from the same parquet files and it works fine. The only difference
>>>>>>> seems the buggy program uses foreachPartition and the working program uses
>>>>>>> map.
>>>>>>>
>>>>>>> Here's the full stacktrace:
>>>>>>>
>>>>>>> "Executor task launch worker-3"
>>>>>>>    java.lang.Thread.State: RUNNABLE
>>>>>>>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>>>>         at
>>>>>>> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:257)
>>>>>>>         at
>>>>>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>>>>>>         at
>>>>>>> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>>>>>>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>>>>>>         at
>>>>>>> org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
>>>>>>>         at
>>>>>>> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
>>>>>>>         at
>>>>>>> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>>>>>>>         at
>>>>>>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
>>>>>>>         at
>>>>>>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
>>>>>>>         at
>>>>>>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)
>>>>>>>         at
>>>>>>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
>>>>>>>         at
>>>>>>> org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:173)
>>>>>>>         at
>>>>>>> org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:138)
>>>>>>>         at
>>>>>>> org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:683)
>>>>>>>         at
>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:739)
>>>>>>>         at
>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:796)
>>>>>>>         at
>>>>>>> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
>>>>>>>         at
>>>>>>> java.io.DataInputStream.readFully(DataInputStream.java:195)
>>>>>>>         at
>>>>>>> java.io.DataInputStream.readFully(DataInputStream.java:169)
>>>>>>>         at
>>>>>>> parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
>>>>>>>         at
>>>>>>> parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
>>>>>>>         at
>>>>>>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
>>>>>>>         at
>>>>>>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
>>>>>>>         at
>>>>>>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:139)
>>>>>>>         at
>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>         at
>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>         at
>>>>>>> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>>>>>>         at
>>>>>>> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>>>>>>         at
>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>         at
>>>>>>> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
>>>>>>>         at
>>>>>>> scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
>>>>>>>         at
>>>>>>> scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:969)
>>>>>>>         at
>>>>>>> scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
>>>>>>>         at
>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>         at
>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>         at
>>>>>>> com.paypal.risk.rds.dragon.storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.apply(HbaseRDDBatch.scala:179)
>>>>>>>         at
>>>>>>> com.paypal.risk.rds.dragon.storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.apply(HbaseRDDBatch.scala:167)
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:767)
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:767)
>>>>>>>         at
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1103)
>>>>>>>         at
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1103)
>>>>>>>         at
>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>         at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>         at java.lang.Thread.run(Thread.java:724)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Jianshi Huang
>>>>>>>
>>>>>>> LinkedIn: jianshi
>>>>>>> Twitter: @jshuang
>>>>>>> Github & Blog: http://huangjs.github.com/
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jianshi Huang
>>>>>
>>>>> LinkedIn: jianshi
>>>>> Twitter: @jshuang
>>>>> Github & Blog: http://huangjs.github.com/
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>