You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mahout.apache.org by Pat Ferrel <pa...@occamsmachete.com> on 2015/04/04 20:28:13 UTC

Re: spark-itemsimilarity IndexException - outside allowable range

I guess you are suggesting to try par(auto=true) 

par is in 

    class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) {…}

and drmWrap is in 

    package object sparkbindings {…}

I have access to a DrmLike[Int] and a CheckpointedDrm (after the drmWrap).

So I need to make par() public and use it inside drmWrap? or on the DrmLike[Int] with type coercion? The use is difficult for me to follow since all it does is create a OpPar, which I’m unclear about how to use.

def par(min: Int = -1, exact: Int = -1, auto: Boolean = false) = {
  assert(min >= 0 || exact >= 0 || auto, "Invalid argument")
  OpPar(drm, minSplits = min, exactSplits = exact)
}

In the test suite the partitioning is checked but OpPar is never used to calculate anything so it’s not clear from that.

My creation of CheckpointedDrm is basically

    ...
    val indexedInteractions =
      interactions.map { case (rowID, columnID) =>
        val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
        val columnIndex = columnIDDictionary_bcast.value.get(columnID).get

        rowIndex -> columnIndex
      }
      // group by IDs to form row vectors
      .groupByKey().map { case (rowIndex, columnIndexes) =>
        val row = new RandomAccessSparseVector(ncol)
        for (columnIndex <- columnIndexes) {
          row.setQuick(columnIndex, 1.0)
        }
        rowIndex -> row
      }
      .asInstanceOf[DrmRdd[Int]]

    // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
    //val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol)
    val drmInteractions = drmWrap[Int](indexedInteractions)

notice I tried not setting the ncol explicitly and left it lazy.


On Apr 3, 2015, at 2:58 PM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

keep in mind that we have (do we in public version?) par() operator that re-shuffles original inputs to desired degree of parallelism (or just does shuffles coalesce). it seems to be pretty effective in addressing skew in my tests.

However, a lot  of my upcoming bug fixes are addressing parallelization skews for longer pipelines. I.e. suppose you have 192 core initially, so you do something like par(auto=true) or par(exact=192) but that's only for input. as you observe a longer running pipelines, the number of tasks may skew very greatly. On of the biggest problems currently for example is use of cartesian() in AB' which produces quadratic number of tasks. There are issues like this elsewhere but i guess you are for most part affected only by A'A operator which is not a biggie compared to other things i tried to do.

SSVD fortunately does not optimize to use AB' though so it is largely not affected by parallelism skew issue (except there's a bug computing parallelism # in one of other operators there).

 


On Fri, Apr 3, 2015 at 1:55 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:
In both recent cases if the files are concatenated the job runs, but since they are created with Spark streaming you get a bunch of small (maybe even empty) micro-batch part files. So doesn’t seem to be related to memory use but number of input files. Certainly exactly the same data in both cases.

BTW this situation raises hell with reading text files, which seem to need an optimizer to determine how best to do it. The current Spark text reader needs to get all status info for all part files and so is really slow. It uses this to do some partitioning. The part files are sometimes empty.

If this is related to empty part files I can screen those out as a hack.

Had a discussion about how to optimize that on the spark list and wish they’d do it under the covers.


On Apr 3, 2015, at 1:28 PM, Dmitriy Lyubimov <dlieu.7@gmail.com <ma...@gmail.com>> wrote:

stuff carries assumption that there are no empty partitions. 
it also assumes that if blockified, there's 1 block per partition. 
(it is a bad assumption to make in case of kryo serialization because it may (i am not 100% sure) mean that kryo buffer must be as big as the whole partition; but it wasn't a problem so far since our code so far was cpu-bound, not so much memory bound). 

when we bolt on bigger libraries, this may become a more prominent issue though. 

maybe we should write up a quick RDD validation routine that checks for lengths, empty partitions etc . which we can use for debugging and unit tests. 

We probably don't want to run it inline though since it would be unnecessary expense. 

-d

On Fri, Apr 3, 2015 at 1:25 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:
Sure. Damn red tape


On Apr 3, 2015, at 1:04 PM, Dmitriy Lyubimov <dlieu.7@gmail.com <ma...@gmail.com>> wrote:

Sorry about that. it is not me who is red taping me.

my code is diverging from public code too great; and i don't have time to look at public version until sunday night. I can send you some tips sunday night, ok? it is not a biggie in this case (unlike some other faults). I will overwrite this some time end of April. ok?



On Fri, Apr 3, 2015 at 12:58 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:
This happens quite often with itemsimilarity.  I’d hate to release with no workaround? In fact I have a client that ran into this today as well as the guy on the user list.

Can you give me some guidance about a more fundamental fix?


On Apr 3, 2015, at 12:48 PM, Dmitriy Lyubimov <dlieu.7@gmail.com <ma...@gmail.com>> wrote:

no, i don't think there's a workaround. it needs a fix; however, in public
version there are much more fixes needed so I think this part will be
refactored completely in 0.10.1

On Fri, Apr 3, 2015 at 12:38 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:

> OK, it was. Is there a workaround I can try?
>
>
> On Apr 3, 2015, at 12:22 PM, Dmitriy Lyubimov <dlieu.7@gmail.com <ma...@gmail.com>> wrote:
>
> Although... i am not aware of one in A'A
>
> could be faulty vector length in a matrix if matrix was created by drmWrap
> with explicit specification of ncol
>
> On Fri, Apr 3, 2015 at 12:20 PM, Dmitriy Lyubimov <dlieu.7@gmail.com <ma...@gmail.com>>
> wrote:
>
>> it's a  bug. There's a number of similar ones in operator A'B.
>>
>> On Fri, Apr 3, 2015 at 6:23 AM, Michael Kelly <michael@onespot.com <ma...@onespot.com>>
> wrote:
>>
>>> Hi Pat,
>>>
>>> I've done some further digging and it looks like the problem is
>>> occurring when the input files are split up to into parts. The input
>>> to the item-similarity matrix is the output from a spark job and it
>>> ends up in about 2000 parts (on the hadoop file system). I have
>>> reproduced the error locally using a small subset of the rows.
>>>
>>> This is a snippet of the file I am using -
>>>
>>> ...
>>>
>>> 5138353282348067470,1891081885
>>> 4417954190713934181,1828065687
>>> 133682221673920382,1454844406
>>> 133682221673920382,1129053737
>>> 133682221673920382,548627241
>>> 133682221673920382,1048452021
>>> 8547417492653230933,1121310481
>>> 7693904559640861382,1333374361
>>> 7204049418352603234,606209305
>>> 139299176617553863,467181330
>>> ...
>>>
>>>
>>> When I run the item-similarity against a single input file which
>>> contains all the rows, the job succeeds without error.
>>>
>>> When I break up the input file into 100 parts, and use the directory
>>> containing them as input then I get the 'Index outside allowable
>>> range' exception.
>>>
>>> Her are the input files that I used tarred and gzipped -
>>>
>>>
>>>
> https://s3.amazonaws.com/static.onespot.com/mahout/passing_single_file.tar.gz <https://s3.amazonaws.com/static.onespot.com/mahout/passing_single_file.tar.gz>
>>>
>>>
> https://s3.amazonaws.com/static.onespot.com/mahout/failing_split_into_100_parts.tar.gz <https://s3.amazonaws.com/static.onespot.com/mahout/failing_split_into_100_parts.tar.gz>
>>>
>>> There are 44067 rows in total, 11858 unique userIds and 24166 unique
>>> itemIds.
>>>
>>> This is the exception that I see on the 100 part run -
>>> 15/04/03 12:07:09 ERROR Executor: Exception in task 0.0 in stage 9.0
> (TID
>>> 707)
>>> org.apache.mahout.math.IndexException: Index 24190 is outside
>>> allowable range of [0,24166)
>>> at
> org.apache.mahout.math.AbstractVector.viewPart(AbstractVector.java:147)
>>> at
>>> org.apache.mahout.math.scalabindings.VectorOps.apply(VectorOps.scala:37)
>>> at
>>>
> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:152)
>>> at
>>>
> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:149)
>>> at
>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
>>> at
>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
>>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
>>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
>>> at
>>>
> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
>>> at
>>>
> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
>>> at
>>>
> scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969)
>>> at
> scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969)
>>> at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at
>>>
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
>>> at
>>>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
>>> at
>>>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>> at
>>>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>> at
>>>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> I tried splitting the file up in 10,20 and 50 parts and the job
> completed.
>>> Also, should the resulting similarity matrix be the same wether the
>>> input is split up or not? I passed in the same random seed for the
>>> spark job, but the matrices were different
>>>
>>> Thanks,
>>>
>>> Michael
>>>
>>>
>>>
>>> On Thu, Apr 2, 2015 at 6:56 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>>
> wrote:
>>>> The input must be tuples (if not using a filter) so the CLI you have
>>> expects user and item ids that are
>>>>
>>>> user-id1,item-id1
>>>> user-id500,item-id3000
>>>> …
>>>>
>>>> The ids must be tokenized because it doesn’t use a full csv parser,
>>> only lines of delimited text.
>>>>
>>>> If this doesn’t help can you supply a snippet of the input
>>>>
>>>>
>>>> On Apr 2, 2015, at 10:39 AM, Michael Kelly <michael@onespot.com <ma...@onespot.com>>
> wrote:
>>>>
>>>> Hi all,
>>>>
>>>> I'm running the spark-itemsimilarity job from the cli on an AWS emr
>>>> cluster, and I'm running into an exception.
>>>>
>>>> The input file format is
>>>> UserId<tab>ItemId1<tab>ItemId2<tab>ItemId3......
>>>>
>>>> There is only one row per user, and a total of 97,000 rows.
>>>>
>>>> I also tried input with one row per UserId/ItemId pair, which had
>>>> about 250,000 rows, but I also saw a similar exception, this time the
>>>> out of bounds index was around 110,000.
>>>>
>>>> The input is stored in hdfs and this is the command I used to start the
>>> job -
>>>>
>>>> mahout spark-itemsimilarity --input userItems --output output --master
>>>> yarn-client
>>>>
>>>> Any idea what the problem might be?
>>>>
>>>> Thanks,
>>>>
>>>> Michael
>>>>
>>>>
>>>>
>>>> 15/04/02 16:37:40 WARN TaskSetManager: Lost task 1.0 in stage 10.0
>>>> (TID 7631, ip-XX.XX.ec2.internal):
>>>> org.apache.mahout.math.IndexException: Index 22050 is outside
>>>> allowable range of [0,21997)
>>>>
>>>>
>>> org.apache.mahout.math.AbstractVector.viewPart(AbstractVector.java:147)
>>>>
>>>>
>>> org.apache.mahout.math.scalabindings.VectorOps.apply(VectorOps.scala:37)
>>>>
>>>>
>>>
> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:152)
>>>>
>>>>
>>>
> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:149)
>>>>
>>>>
>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
>>>>
>>>>
>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
>>>>
>>>>      scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
>>>>
>>>>      scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
>>>>
>>>>
>>>
> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
>>>>
>>>>
>>>
> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
>>>>
>>>>
>>>
> scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969)
>>>>
>>>>
>>> scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969)
>>>>
>>>>
>>> scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974)
>>>>
>>>>      scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>
>>>>
>>>
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:144)
>>>>
>>>>
>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>>>>
>>>>
>>>
> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
>>>>
>>>>
>>>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>
>>>>
>>>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>
>>>>      org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>
>>>>
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>>
>>>>
>>>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>
>>>>
>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>
>>>>      java.lang.Thread.run(Thread.java:745)
>>>>
>>>
>>
>>
>
>








Re: spark-itemsimilarity IndexException - outside allowable range

Posted by Pat Ferrel <pa...@occamsmachete.com>.
There are several partitioning methods available on DrmRdd for specifying a Partitioner, or repartitioning. Might these help? No automatic option AFAICT.

When I “repartition(5)” given there were 100 files and running locally, spark-itemsimilarity completes A’A correctly. Is there a better way to choose “5”?


On Apr 4, 2015, at 11:28 AM, Pat Ferrel <pa...@occamsmachete.com> wrote:

I guess you are suggesting to try par(auto=true) 

par is in 

    class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) {…}

and drmWrap is in 

    package object sparkbindings {…}

I have access to a DrmLike[Int] and a CheckpointedDrm (after the drmWrap).

So I need to make par() public and use it inside drmWrap? or on the DrmLike[Int] with type coercion? The use is difficult for me to follow since all it does is create a OpPar, which I’m unclear about how to use.

def par(min: Int = -1, exact: Int = -1, auto: Boolean = false) = {
  assert(min >= 0 || exact >= 0 || auto, "Invalid argument")
  OpPar(drm, minSplits = min, exactSplits = exact)
}

In the test suite the partitioning is checked but OpPar is never used to calculate anything so it’s not clear from that.

My creation of CheckpointedDrm is basically

    ...
    val indexedInteractions =
      interactions.map { case (rowID, columnID) =>
        val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
        val columnIndex = columnIDDictionary_bcast.value.get(columnID).get

        rowIndex -> columnIndex
      }
      // group by IDs to form row vectors
      .groupByKey().map { case (rowIndex, columnIndexes) =>
        val row = new RandomAccessSparseVector(ncol)
        for (columnIndex <- columnIndexes) {
          row.setQuick(columnIndex, 1.0)
        }
        rowIndex -> row
      }
      .asInstanceOf[DrmRdd[Int]]

    // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
    //val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol)
    val drmInteractions = drmWrap[Int](indexedInteractions)

notice I tried not setting the ncol explicitly and left it lazy.


On Apr 3, 2015, at 2:58 PM, Dmitriy Lyubimov <dlieu.7@gmail.com <ma...@gmail.com>> wrote:

keep in mind that we have (do we in public version?) par() operator that re-shuffles original inputs to desired degree of parallelism (or just does shuffles coalesce). it seems to be pretty effective in addressing skew in my tests.

However, a lot  of my upcoming bug fixes are addressing parallelization skews for longer pipelines. I.e. suppose you have 192 core initially, so you do something like par(auto=true) or par(exact=192) but that's only for input. as you observe a longer running pipelines, the number of tasks may skew very greatly. On of the biggest problems currently for example is use of cartesian() in AB' which produces quadratic number of tasks. There are issues like this elsewhere but i guess you are for most part affected only by A'A operator which is not a biggie compared to other things i tried to do.

SSVD fortunately does not optimize to use AB' though so it is largely not affected by parallelism skew issue (except there's a bug computing parallelism # in one of other operators there).

 


On Fri, Apr 3, 2015 at 1:55 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:
In both recent cases if the files are concatenated the job runs, but since they are created with Spark streaming you get a bunch of small (maybe even empty) micro-batch part files. So doesn’t seem to be related to memory use but number of input files. Certainly exactly the same data in both cases.

BTW this situation raises hell with reading text files, which seem to need an optimizer to determine how best to do it. The current Spark text reader needs to get all status info for all part files and so is really slow. It uses this to do some partitioning. The part files are sometimes empty.

If this is related to empty part files I can screen those out as a hack.

Had a discussion about how to optimize that on the spark list and wish they’d do it under the covers.


On Apr 3, 2015, at 1:28 PM, Dmitriy Lyubimov <dlieu.7@gmail.com <ma...@gmail.com>> wrote:

stuff carries assumption that there are no empty partitions. 
it also assumes that if blockified, there's 1 block per partition. 
(it is a bad assumption to make in case of kryo serialization because it may (i am not 100% sure) mean that kryo buffer must be as big as the whole partition; but it wasn't a problem so far since our code so far was cpu-bound, not so much memory bound). 

when we bolt on bigger libraries, this may become a more prominent issue though. 

maybe we should write up a quick RDD validation routine that checks for lengths, empty partitions etc . which we can use for debugging and unit tests. 

We probably don't want to run it inline though since it would be unnecessary expense. 

-d

On Fri, Apr 3, 2015 at 1:25 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:
Sure. Damn red tape


On Apr 3, 2015, at 1:04 PM, Dmitriy Lyubimov <dlieu.7@gmail.com <ma...@gmail.com>> wrote:

Sorry about that. it is not me who is red taping me.

my code is diverging from public code too great; and i don't have time to look at public version until sunday night. I can send you some tips sunday night, ok? it is not a biggie in this case (unlike some other faults). I will overwrite this some time end of April. ok?



On Fri, Apr 3, 2015 at 12:58 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:
This happens quite often with itemsimilarity.  I’d hate to release with no workaround? In fact I have a client that ran into this today as well as the guy on the user list.

Can you give me some guidance about a more fundamental fix?


On Apr 3, 2015, at 12:48 PM, Dmitriy Lyubimov <dlieu.7@gmail.com <ma...@gmail.com>> wrote:

no, i don't think there's a workaround. it needs a fix; however, in public
version there are much more fixes needed so I think this part will be
refactored completely in 0.10.1

On Fri, Apr 3, 2015 at 12:38 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>> wrote:

> OK, it was. Is there a workaround I can try?
>
>
> On Apr 3, 2015, at 12:22 PM, Dmitriy Lyubimov <dlieu.7@gmail.com <ma...@gmail.com>> wrote:
>
> Although... i am not aware of one in A'A
>
> could be faulty vector length in a matrix if matrix was created by drmWrap
> with explicit specification of ncol
>
> On Fri, Apr 3, 2015 at 12:20 PM, Dmitriy Lyubimov <dlieu.7@gmail.com <ma...@gmail.com>>
> wrote:
>
>> it's a  bug. There's a number of similar ones in operator A'B.
>>
>> On Fri, Apr 3, 2015 at 6:23 AM, Michael Kelly <michael@onespot.com <ma...@onespot.com>>
> wrote:
>>
>>> Hi Pat,
>>>
>>> I've done some further digging and it looks like the problem is
>>> occurring when the input files are split up to into parts. The input
>>> to the item-similarity matrix is the output from a spark job and it
>>> ends up in about 2000 parts (on the hadoop file system). I have
>>> reproduced the error locally using a small subset of the rows.
>>>
>>> This is a snippet of the file I am using -
>>>
>>> ...
>>>
>>> 5138353282348067470,1891081885
>>> 4417954190713934181,1828065687
>>> 133682221673920382,1454844406
>>> 133682221673920382,1129053737
>>> 133682221673920382,548627241
>>> 133682221673920382,1048452021
>>> 8547417492653230933,1121310481
>>> 7693904559640861382,1333374361
>>> 7204049418352603234,606209305
>>> 139299176617553863,467181330
>>> ...
>>>
>>>
>>> When I run the item-similarity against a single input file which
>>> contains all the rows, the job succeeds without error.
>>>
>>> When I break up the input file into 100 parts, and use the directory
>>> containing them as input then I get the 'Index outside allowable
>>> range' exception.
>>>
>>> Her are the input files that I used tarred and gzipped -
>>>
>>>
>>>
> https://s3.amazonaws.com/static.onespot.com/mahout/passing_single_file.tar.gz <https://s3.amazonaws.com/static.onespot.com/mahout/passing_single_file.tar.gz>
>>>
>>>
> https://s3.amazonaws.com/static.onespot.com/mahout/failing_split_into_100_parts.tar.gz <https://s3.amazonaws.com/static.onespot.com/mahout/failing_split_into_100_parts.tar.gz>
>>>
>>> There are 44067 rows in total, 11858 unique userIds and 24166 unique
>>> itemIds.
>>>
>>> This is the exception that I see on the 100 part run -
>>> 15/04/03 12:07:09 ERROR Executor: Exception in task 0.0 in stage 9.0
> (TID
>>> 707)
>>> org.apache.mahout.math.IndexException: Index 24190 is outside
>>> allowable range of [0,24166)
>>> at
> org.apache.mahout.math.AbstractVector.viewPart(AbstractVector.java:147)
>>> at
>>> org.apache.mahout.math.scalabindings.VectorOps.apply(VectorOps.scala:37)
>>> at
>>>
> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:152)
>>> at
>>>
> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:149)
>>> at
>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
>>> at
>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
>>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
>>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
>>> at
>>>
> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
>>> at
>>>
> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
>>> at
>>>
> scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969)
>>> at
> scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969)
>>> at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at
>>>
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
>>> at
>>>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
>>> at
>>>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>> at
>>>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>> at
>>>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> I tried splitting the file up in 10,20 and 50 parts and the job
> completed.
>>> Also, should the resulting similarity matrix be the same wether the
>>> input is split up or not? I passed in the same random seed for the
>>> spark job, but the matrices were different
>>>
>>> Thanks,
>>>
>>> Michael
>>>
>>>
>>>
>>> On Thu, Apr 2, 2015 at 6:56 PM, Pat Ferrel <pat@occamsmachete.com <ma...@occamsmachete.com>>
> wrote:
>>>> The input must be tuples (if not using a filter) so the CLI you have
>>> expects user and item ids that are
>>>>
>>>> user-id1,item-id1
>>>> user-id500,item-id3000
>>>> …
>>>>
>>>> The ids must be tokenized because it doesn’t use a full csv parser,
>>> only lines of delimited text.
>>>>
>>>> If this doesn’t help can you supply a snippet of the input
>>>>
>>>>
>>>> On Apr 2, 2015, at 10:39 AM, Michael Kelly <michael@onespot.com <ma...@onespot.com>>
> wrote:
>>>>
>>>> Hi all,
>>>>
>>>> I'm running the spark-itemsimilarity job from the cli on an AWS emr
>>>> cluster, and I'm running into an exception.
>>>>
>>>> The input file format is
>>>> UserId<tab>ItemId1<tab>ItemId2<tab>ItemId3......
>>>>
>>>> There is only one row per user, and a total of 97,000 rows.
>>>>
>>>> I also tried input with one row per UserId/ItemId pair, which had
>>>> about 250,000 rows, but I also saw a similar exception, this time the
>>>> out of bounds index was around 110,000.
>>>>
>>>> The input is stored in hdfs and this is the command I used to start the
>>> job -
>>>>
>>>> mahout spark-itemsimilarity --input userItems --output output --master
>>>> yarn-client
>>>>
>>>> Any idea what the problem might be?
>>>>
>>>> Thanks,
>>>>
>>>> Michael
>>>>
>>>>
>>>>
>>>> 15/04/02 16:37:40 WARN TaskSetManager: Lost task 1.0 in stage 10.0
>>>> (TID 7631, ip-XX.XX.ec2.internal):
>>>> org.apache.mahout.math.IndexException: Index 22050 is outside
>>>> allowable range of [0,21997)
>>>>
>>>>
>>> org.apache.mahout.math.AbstractVector.viewPart(AbstractVector.java:147)
>>>>
>>>>
>>> org.apache.mahout.math.scalabindings.VectorOps.apply(VectorOps.scala:37)
>>>>
>>>>
>>>
> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:152)
>>>>
>>>>
>>>
> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:149)
>>>>
>>>>
>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
>>>>
>>>>
>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
>>>>
>>>>      scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
>>>>
>>>>      scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
>>>>
>>>>
>>>
> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
>>>>
>>>>
>>>
> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
>>>>
>>>>
>>>
> scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969)
>>>>
>>>>
>>> scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969)
>>>>
>>>>
>>> scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974)
>>>>
>>>>      scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>
>>>>
>>>
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:144)
>>>>
>>>>
>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>>>>
>>>>
>>>
> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
>>>>
>>>>
>>>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>
>>>>
>>>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>
>>>>      org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>
>>>>
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>>
>>>>
>>>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>
>>>>
>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>
>>>>      java.lang.Thread.run(Thread.java:745)
>>>>
>>>
>>
>>
>
>