You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Li Li <fa...@gmail.com> on 2015/12/28 04:26:43 UTC

running lda in spark throws exception

I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
it throws exception in line:   Matrix topics = ldaModel.topicsMatrix();
But in yarn job history ui, it's successful. What's wrong with it?
I submit job with
.bin/spark-submit --class Myclass \
    --master yarn-client \
    --num-executors 2 \
    --driver-memory 4g \
    --executor-memory 4g \
    --executor-cores 1 \


My codes:

   corpus.cache();


    // Cluster the documents into three topics using LDA

    DistributedLDAModel ldaModel = (DistributedLDAModel) new
LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);


    // Output topics. Each is a distribution over words (matching word
count vectors)

    System.out.println("Learned topics (as distributions over vocab of
" + ldaModel.vocabSize()

        + " words):");

   //Line81, exception here:    Matrix topics = ldaModel.topicsMatrix();

    for (int topic = 0; topic < topicNumber; topic++) {

      System.out.print("Topic " + topic + ":");

      for (int word = 0; word < ldaModel.vocabSize(); word++) {

        System.out.print(" " + topics.apply(word, topic));

      }

      System.out.println();

    }


    ldaModel.save(sc.sc(), modelPath);


Exception in thread "main" java.lang.IndexOutOfBoundsException:
(1025,0) not in [-58,58) x [-100,100)

        at breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)

        at org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)

        at org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)

        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

        at org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)

        at org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)

        at com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:606)

        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)

        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)

        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from shutdown hook

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: running lda in spark throws exception

Posted by Li Li <fa...@gmail.com>.
the data is in the attachment

On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fa...@gmail.com> wrote:
> I use a small data and reproduce the problem.
> But I don't know my codes are correct or not because I am not familiar
> with spark.
> So I first post my codes here. If it's correct, then I will post the data.
> one line of my data like:
>
> { "time":"08-09-17","cmtUrl":"2094361"
> ,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
>     ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
>
> it's a json file which contains webpageUrl and word_vec which is the
> encoded words.
> The first step is to prase the input rdd to a rdd of VectorUrl.
> BTW, if public VectorUrl call(String s) return null, is it ok?
> Then follow the example Index documents with unique IDs
> Then I create a rdd to map id to url so after lda training, I can find
> the url of the document. Then save this rdd to hdfs.
> Then create corpus rdd and train
>
> The exception stack is
>
> 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
> exception: java.lang.IndexOutOfBoundsException: (454,0) not in
> [-58,58) x [-100,100)
> java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x [-100,100)
> at breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
> at org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
> at org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
> at org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
> at com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
>
>
> ==========here is my codes==============
>
> SparkConf conf = new SparkConf().setAppName(ReviewLDA.class.getName());
>
>     JavaSparkContext sc = new JavaSparkContext(conf);
>
>
>     // Load and parse the data
>
>     JavaRDD<String> data = sc.textFile(inputDir + "/*");
>
>     JavaRDD<VectorUrl> parsedData = data.map(new Function<String, VectorUrl>() {
>
>       public VectorUrl call(String s) {
>
>         JsonParser parser = new JsonParser();
>
>         JsonObject jo = parser.parse(s).getAsJsonObject();
>
>         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
>
>           return null;
>
>         }
>
>         JsonArray word_vec = jo.get("word_vec").getAsJsonArray();
>
>         String url = jo.get("webpageUrl").getAsString();
>
>         double[] values = new double[word_vec.size()];
>
>         for (int i = 0; i < values.length; i++)
>
>           values[i] = word_vec.get(i).getAsInt();
>
>         return new VectorUrl(Vectors.dense(values), url);
>
>       }
>
>     });
>
>
>
>     // Index documents with unique IDs
>
>     JavaPairRDD<Long, VectorUrl> id2doc =
> JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
>
>         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long, VectorUrl>>() {
>
>           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl, Long> doc_id) {
>
>             return doc_id.swap();
>
>           }
>
>         }));
>
>     JavaPairRDD<Long, String> id2Url = JavaPairRDD.fromJavaRDD(id2doc
>
>         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, String>>() {
>
>           @Override
>
>           public Tuple2<Long, String> call(Tuple2<Long, VectorUrl>
> id2doc) throws Exception {
>
>             return new Tuple2(id2doc._1, id2doc._2.url);
>
>           }
>
>         }));
>
>     id2Url.saveAsTextFile(id2UrlPath);
>
>     JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(id2doc
>
>         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, Vector>>() {
>
>           @Override
>
>           public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl>
> id2doc) throws Exception {
>
>             return new Tuple2(id2doc._1, id2doc._2.vec);
>
>           }
>
>         }));
>
>     corpus.cache();
>
>
>     // Cluster the documents into three topics using LDA
>
>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
> LDA().setMaxIterations(iterNumber)
>
>         .setK(topicNumber).run(corpus);
>
> On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com> wrote:
>> I will use a portion of data and try. will the hdfs block affect
>> spark?(if so, it's hard to reproduce)
>>
>> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <jo...@databricks.com> wrote:
>>> Hi Li,
>>>
>>> I'm wondering if you're running into the same bug reported here:
>>> https://issues.apache.org/jira/browse/SPARK-12488
>>>
>>> I haven't figured out yet what is causing it.  Do you have a small corpus
>>> which reproduces this error, and which you can share on the JIRA?  If so,
>>> that would help a lot in debugging this failure.
>>>
>>> Thanks!
>>> Joseph
>>>
>>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com> wrote:
>>>>
>>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
>>>> it throws exception in line:   Matrix topics = ldaModel.topicsMatrix();
>>>> But in yarn job history ui, it's successful. What's wrong with it?
>>>> I submit job with
>>>> .bin/spark-submit --class Myclass \
>>>>     --master yarn-client \
>>>>     --num-executors 2 \
>>>>     --driver-memory 4g \
>>>>     --executor-memory 4g \
>>>>     --executor-cores 1 \
>>>>
>>>>
>>>> My codes:
>>>>
>>>>    corpus.cache();
>>>>
>>>>
>>>>     // Cluster the documents into three topics using LDA
>>>>
>>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>>>>
>>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>>>>
>>>>
>>>>     // Output topics. Each is a distribution over words (matching word
>>>> count vectors)
>>>>
>>>>     System.out.println("Learned topics (as distributions over vocab of
>>>> " + ldaModel.vocabSize()
>>>>
>>>>         + " words):");
>>>>
>>>>    //Line81, exception here:    Matrix topics = ldaModel.topicsMatrix();
>>>>
>>>>     for (int topic = 0; topic < topicNumber; topic++) {
>>>>
>>>>       System.out.print("Topic " + topic + ":");
>>>>
>>>>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
>>>>
>>>>         System.out.print(" " + topics.apply(word, topic));
>>>>
>>>>       }
>>>>
>>>>       System.out.println();
>>>>
>>>>     }
>>>>
>>>>
>>>>     ldaModel.save(sc.sc(), modelPath);
>>>>
>>>>
>>>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
>>>> (1025,0) not in [-58,58) x [-100,100)
>>>>
>>>>         at
>>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>>>>
>>>>         at
>>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>>>>
>>>>         at
>>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>>>>
>>>>         at
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>
>>>>         at
>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>
>>>>         at
>>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>>>>
>>>>         at
>>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>>>>
>>>>         at
>>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>>>>
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>
>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>>>>
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>>>
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>>
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>>>>
>>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>
>>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from shutdown
>>>> hook
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: dev-help@spark.apache.org
>>>>
>>>

Re: running lda in spark throws exception

Posted by Li Li <fa...@gmail.com>.
I am running it in 1.5.2. I will try running it in small standalone
cluster to see whether it's correct.

On Sat, Jan 9, 2016 at 6:21 AM, Bryan Cutler <cu...@gmail.com> wrote:
> Hi Li,
>
> I tried out your code and sample data in both local mode and Spark
> Standalone and it ran correctly with output that looks good.  Sorry, I don't
> have a YARN cluster setup right now, so maybe the error you are seeing is
> specific to that.  Btw, I am running the latest Spark code from the master
> branch.  Hope that helps some!
>
> Bryan
>
> On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fa...@gmail.com> wrote:
>>
>> anyone could help? the problem is very easy to reproduce. What's wrong?
>>
>> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fa...@gmail.com> wrote:
>> > I use a small data and reproduce the problem.
>> > But I don't know my codes are correct or not because I am not familiar
>> > with spark.
>> > So I first post my codes here. If it's correct, then I will post the
>> > data.
>> > one line of my data like:
>> >
>> > { "time":"08-09-17","cmtUrl":"2094361"
>> >
>> > ,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
>> >
>> > ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
>> >
>> > it's a json file which contains webpageUrl and word_vec which is the
>> > encoded words.
>> > The first step is to prase the input rdd to a rdd of VectorUrl.
>> > BTW, if public VectorUrl call(String s) return null, is it ok?
>> > Then follow the example Index documents with unique IDs
>> > Then I create a rdd to map id to url so after lda training, I can find
>> > the url of the document. Then save this rdd to hdfs.
>> > Then create corpus rdd and train
>> >
>> > The exception stack is
>> >
>> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
>> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in
>> > [-58,58) x [-100,100)
>> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x
>> > [-100,100)
>> > at breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>> > at
>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>> > at
>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>> > at
>> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> > at
>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>> > at
>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>> > at
>> > com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at
>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> > at
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> > at
>> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
>> >
>> >
>> > ==========here is my codes==============
>> >
>> > SparkConf conf = new SparkConf().setAppName(ReviewLDA.class.getName());
>> >
>> >     JavaSparkContext sc = new JavaSparkContext(conf);
>> >
>> >
>> >     // Load and parse the data
>> >
>> >     JavaRDD<String> data = sc.textFile(inputDir + "/*");
>> >
>> >     JavaRDD<VectorUrl> parsedData = data.map(new Function<String,
>> > VectorUrl>() {
>> >
>> >       public VectorUrl call(String s) {
>> >
>> >         JsonParser parser = new JsonParser();
>> >
>> >         JsonObject jo = parser.parse(s).getAsJsonObject();
>> >
>> >         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
>> >
>> >           return null;
>> >
>> >         }
>> >
>> >         JsonArray word_vec = jo.get("word_vec").getAsJsonArray();
>> >
>> >         String url = jo.get("webpageUrl").getAsString();
>> >
>> >         double[] values = new double[word_vec.size()];
>> >
>> >         for (int i = 0; i < values.length; i++)
>> >
>> >           values[i] = word_vec.get(i).getAsInt();
>> >
>> >         return new VectorUrl(Vectors.dense(values), url);
>> >
>> >       }
>> >
>> >     });
>> >
>> >
>> >
>> >     // Index documents with unique IDs
>> >
>> >     JavaPairRDD<Long, VectorUrl> id2doc =
>> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
>> >
>> >         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long, VectorUrl>>()
>> > {
>> >
>> >           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl, Long>
>> > doc_id) {
>> >
>> >             return doc_id.swap();
>> >
>> >           }
>> >
>> >         }));
>> >
>> >     JavaPairRDD<Long, String> id2Url = JavaPairRDD.fromJavaRDD(id2doc
>> >
>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>> > String>>() {
>> >
>> >           @Override
>> >
>> >           public Tuple2<Long, String> call(Tuple2<Long, VectorUrl>
>> > id2doc) throws Exception {
>> >
>> >             return new Tuple2(id2doc._1, id2doc._2.url);
>> >
>> >           }
>> >
>> >         }));
>> >
>> >     id2Url.saveAsTextFile(id2UrlPath);
>> >
>> >     JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(id2doc
>> >
>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>> > Vector>>() {
>> >
>> >           @Override
>> >
>> >           public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl>
>> > id2doc) throws Exception {
>> >
>> >             return new Tuple2(id2doc._1, id2doc._2.vec);
>> >
>> >           }
>> >
>> >         }));
>> >
>> >     corpus.cache();
>> >
>> >
>> >     // Cluster the documents into three topics using LDA
>> >
>> >     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>> > LDA().setMaxIterations(iterNumber)
>> >
>> >         .setK(topicNumber).run(corpus);
>> >
>> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com> wrote:
>> >> I will use a portion of data and try. will the hdfs block affect
>> >> spark?(if so, it's hard to reproduce)
>> >>
>> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <jo...@databricks.com>
>> >> wrote:
>> >>> Hi Li,
>> >>>
>> >>> I'm wondering if you're running into the same bug reported here:
>> >>> https://issues.apache.org/jira/browse/SPARK-12488
>> >>>
>> >>> I haven't figured out yet what is causing it.  Do you have a small
>> >>> corpus
>> >>> which reproduces this error, and which you can share on the JIRA?  If
>> >>> so,
>> >>> that would help a lot in debugging this failure.
>> >>>
>> >>> Thanks!
>> >>> Joseph
>> >>>
>> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com> wrote:
>> >>>>
>> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
>> >>>> it throws exception in line:   Matrix topics =
>> >>>> ldaModel.topicsMatrix();
>> >>>> But in yarn job history ui, it's successful. What's wrong with it?
>> >>>> I submit job with
>> >>>> .bin/spark-submit --class Myclass \
>> >>>>     --master yarn-client \
>> >>>>     --num-executors 2 \
>> >>>>     --driver-memory 4g \
>> >>>>     --executor-memory 4g \
>> >>>>     --executor-cores 1 \
>> >>>>
>> >>>>
>> >>>> My codes:
>> >>>>
>> >>>>    corpus.cache();
>> >>>>
>> >>>>
>> >>>>     // Cluster the documents into three topics using LDA
>> >>>>
>> >>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>> >>>>
>> >>>>
>> >>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>> >>>>
>> >>>>
>> >>>>     // Output topics. Each is a distribution over words (matching
>> >>>> word
>> >>>> count vectors)
>> >>>>
>> >>>>     System.out.println("Learned topics (as distributions over vocab
>> >>>> of
>> >>>> " + ldaModel.vocabSize()
>> >>>>
>> >>>>         + " words):");
>> >>>>
>> >>>>    //Line81, exception here:    Matrix topics =
>> >>>> ldaModel.topicsMatrix();
>> >>>>
>> >>>>     for (int topic = 0; topic < topicNumber; topic++) {
>> >>>>
>> >>>>       System.out.print("Topic " + topic + ":");
>> >>>>
>> >>>>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
>> >>>>
>> >>>>         System.out.print(" " + topics.apply(word, topic));
>> >>>>
>> >>>>       }
>> >>>>
>> >>>>       System.out.println();
>> >>>>
>> >>>>     }
>> >>>>
>> >>>>
>> >>>>     ldaModel.save(sc.sc(), modelPath);
>> >>>>
>> >>>>
>> >>>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
>> >>>> (1025,0) not in [-58,58) x [-100,100)
>> >>>>
>> >>>>         at
>> >>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >>>>
>> >>>>         at
>> >>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>> >>>>
>> >>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>> >>>> Method)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>
>> >>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> >>>>
>> >>>>         at
>> >>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> >>>>
>> >>>>         at
>> >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>> >>>>
>> >>>>         at
>> >>>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> >>>>
>> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from
>> >>>> shutdown
>> >>>> hook
>> >>>>
>> >>>> ---------------------------------------------------------------------
>> >>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> >>>> For additional commands, e-mail: dev-help@spark.apache.org
>> >>>>
>> >>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> For additional commands, e-mail: dev-help@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: running lda in spark throws exception

Posted by Li Li <fa...@gmail.com>.
I got it. I mistakenly thought that each line is a wordid list.

On Fri, Jan 15, 2016 at 3:24 AM, Bryan Cutler <cu...@gmail.com> wrote:
> What I mean is the input to LDA.run() is a RDD[(Long, Vector)] and the
> Vector is a vector of counts of each term and should be the same size as the
> vocabulary (so if the vocabulary, or dictionary has 10 words, each vector
> should have a size of 10).  This probably means that there will be some
> elements with zero counts, and a sparse vector might be a good way to handle
> that.
>
> On Wed, Jan 13, 2016 at 6:40 PM, Li Li <fa...@gmail.com> wrote:
>>
>> It looks like the problem is the vectors of term counts in the corpus
>> are not always the vocabulary size.
>> Do you mean some integers not occured in the corpus?
>> for example, I have the dictionary is 0 - 9 (total 10 words).
>> The docs are:
>> 0 2 4 6 8
>> 1 3 5 7 9
>> Then it will be correct
>> If the docs are:
>> 0 2 4 6 9
>> 1 3 5 6 7
>> 8 is not occured in any document, Then it will wrong?
>>
>> So the workaround is to process the input to re-encode terms?
>>
>> On Thu, Jan 14, 2016 at 6:53 AM, Bryan Cutler <cu...@gmail.com> wrote:
>> > I was now able to reproduce the exception using the master branch and
>> > local
>> > mode.  It looks like the problem is the vectors of term counts in the
>> > corpus
>> > are not always the vocabulary size.  Once I padded these with zero
>> > counts to
>> > the vocab size, it ran without the exception.
>> >
>> > Joseph, I also tried calling describeTopics and noticed that with the
>> > improper vector size, it will not throw an exception but the term
>> > indices
>> > will start to be incorrect.  For a small number of iterations, it is ok,
>> > but
>> > increasing iterations causes the indices to get larger also.  Maybe that
>> > is
>> > what is going on in the JIRA you linked to?
>> >
>> > On Wed, Jan 13, 2016 at 1:17 AM, Li Li <fa...@gmail.com> wrote:
>> >>
>> >> I will try spark 1.6.0 to see it is the bug of 1.5.2.
>> >>
>> >> On Wed, Jan 13, 2016 at 3:58 PM, Li Li <fa...@gmail.com> wrote:
>> >> > I have set up a stand alone spark cluster and use the same codes. it
>> >> > still failed with the same exception
>> >> > I also preprocessed the data to lines of integers and use the scala
>> >> > codes of lda example. it still failed.
>> >> > the codes:
>> >> >
>> >> > import org.apache.spark.mllib.clustering.{ LDA, DistributedLDAModel }
>> >> >
>> >> > import org.apache.spark.mllib.linalg.Vectors
>> >> >
>> >> > import org.apache.spark.SparkContext
>> >> >
>> >> > import org.apache.spark.SparkContext._
>> >> >
>> >> > import org.apache.spark.SparkConf
>> >> >
>> >> >
>> >> > object TestLDA {
>> >> >
>> >> >   def main(args: Array[String]) {
>> >> >
>> >> >     if(args.length!=4){
>> >> >
>> >> >       println("need 4 args inDir outDir topic iternum")
>> >> >
>> >> >       System.exit(-1)
>> >> >
>> >> >     }
>> >> >
>> >> >     val conf = new SparkConf().setAppName("TestLDA")
>> >> >
>> >> >     val sc = new SparkContext(conf)
>> >> >
>> >> >     // Load and parse the data
>> >> >
>> >> >     val data = sc.textFile(args(0))
>> >> >
>> >> >     val parsedData = data.map(s => Vectors.dense(s.trim.split('
>> >> > ').map(_.toDouble)))
>> >> >
>> >> >     // Index documents with unique IDs
>> >> >
>> >> >     val corpus = parsedData.zipWithIndex.map(_.swap).cache()
>> >> >
>> >> >     val topicNum=Integer.valueOf(args(2))
>> >> >
>> >> >     val iterNum=Integer.valueOf(args(1))
>> >> >
>> >> >     // Cluster the documents into three topics using LDA
>> >> >
>> >> >     val ldaModel = new
>> >> > LDA().setK(topicNum).setMaxIterations(iterNum).run(corpus)
>> >> >
>> >> >
>> >> >     // Output topics. Each is a distribution over words (matching
>> >> > word
>> >> > count vectors)
>> >> >
>> >> >     println("Learned topics (as distributions over vocab of " +
>> >> > ldaModel.vocabSize + " words):")
>> >> >
>> >> >     val topics = ldaModel.topicsMatrix
>> >> >
>> >> >     for (topic <- Range(0, topicNum)) {
>> >> >
>> >> >       print("Topic " + topic + ":")
>> >> >
>> >> >       for (word <- Range(0, ldaModel.vocabSize)) { print(" " +
>> >> > topics(word, topic)); }
>> >> >
>> >> >       println()
>> >> >
>> >> >     }
>> >> >
>> >> >
>> >> >     // Save and load model.
>> >> >
>> >> >     ldaModel.save(sc, args(1))
>> >> >
>> >> >   }
>> >> >
>> >> >
>> >> > }
>> >> >
>> >> > scripts to submit:
>> >> >
>> >> > ~/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class
>> >> > com.mobvoi.knowledgegraph.scala_test.TestLDA \
>> >> >
>> >> >     --master spark://master:7077 \
>> >> >
>> >> >     --num-executors 10 \
>> >> >
>> >> >     --executor-memory 4g \
>> >> >
>> >> >     --executor-cores 3 \
>> >> >
>> >> >     scala_test-1.0-jar-with-dependencies.jar \
>> >> >
>> >> >     /test.txt \
>> >> >
>> >> >     100 \
>> >> >
>> >> >     5  \
>> >> >
>> >> >     /lda_model
>> >> >
>> >> > test.txt is in attachment
>> >> >
>> >> >
>> >> > On Sat, Jan 9, 2016 at 6:21 AM, Bryan Cutler <cu...@gmail.com>
>> >> > wrote:
>> >> >> Hi Li,
>> >> >>
>> >> >> I tried out your code and sample data in both local mode and Spark
>> >> >> Standalone and it ran correctly with output that looks good.  Sorry,
>> >> >> I
>> >> >> don't
>> >> >> have a YARN cluster setup right now, so maybe the error you are
>> >> >> seeing
>> >> >> is
>> >> >> specific to that.  Btw, I am running the latest Spark code from the
>> >> >> master
>> >> >> branch.  Hope that helps some!
>> >> >>
>> >> >> Bryan
>> >> >>
>> >> >> On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fa...@gmail.com> wrote:
>> >> >>>
>> >> >>> anyone could help? the problem is very easy to reproduce. What's
>> >> >>> wrong?
>> >> >>>
>> >> >>> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fa...@gmail.com> wrote:
>> >> >>> > I use a small data and reproduce the problem.
>> >> >>> > But I don't know my codes are correct or not because I am not
>> >> >>> > familiar
>> >> >>> > with spark.
>> >> >>> > So I first post my codes here. If it's correct, then I will post
>> >> >>> > the
>> >> >>> > data.
>> >> >>> > one line of my data like:
>> >> >>> >
>> >> >>> > { "time":"08-09-17","cmtUrl":"2094361"
>> >> >>> >
>> >> >>> >
>> >> >>> >
>> >> >>> > ,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
>> >> >>> >
>> >> >>> >
>> >> >>> >
>> >> >>> > ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
>> >> >>> >
>> >> >>> > it's a json file which contains webpageUrl and word_vec which is
>> >> >>> > the
>> >> >>> > encoded words.
>> >> >>> > The first step is to prase the input rdd to a rdd of VectorUrl.
>> >> >>> > BTW, if public VectorUrl call(String s) return null, is it ok?
>> >> >>> > Then follow the example Index documents with unique IDs
>> >> >>> > Then I create a rdd to map id to url so after lda training, I can
>> >> >>> > find
>> >> >>> > the url of the document. Then save this rdd to hdfs.
>> >> >>> > Then create corpus rdd and train
>> >> >>> >
>> >> >>> > The exception stack is
>> >> >>> >
>> >> >>> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
>> >> >>> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in
>> >> >>> > [-58,58) x [-100,100)
>> >> >>> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x
>> >> >>> > [-100,100)
>> >> >>> > at
>> >> >>> >
>> >> >>> > breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >> >>> > at
>> >> >>> >
>> >> >>> > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
>> >> >>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> >>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
>> >> >>> >
>> >> >>> >
>> >> >>> > ==========here is my codes==============
>> >> >>> >
>> >> >>> > SparkConf conf = new
>> >> >>> > SparkConf().setAppName(ReviewLDA.class.getName());
>> >> >>> >
>> >> >>> >     JavaSparkContext sc = new JavaSparkContext(conf);
>> >> >>> >
>> >> >>> >
>> >> >>> >     // Load and parse the data
>> >> >>> >
>> >> >>> >     JavaRDD<String> data = sc.textFile(inputDir + "/*");
>> >> >>> >
>> >> >>> >     JavaRDD<VectorUrl> parsedData = data.map(new Function<String,
>> >> >>> > VectorUrl>() {
>> >> >>> >
>> >> >>> >       public VectorUrl call(String s) {
>> >> >>> >
>> >> >>> >         JsonParser parser = new JsonParser();
>> >> >>> >
>> >> >>> >         JsonObject jo = parser.parse(s).getAsJsonObject();
>> >> >>> >
>> >> >>> >         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
>> >> >>> >
>> >> >>> >           return null;
>> >> >>> >
>> >> >>> >         }
>> >> >>> >
>> >> >>> >         JsonArray word_vec = jo.get("word_vec").getAsJsonArray();
>> >> >>> >
>> >> >>> >         String url = jo.get("webpageUrl").getAsString();
>> >> >>> >
>> >> >>> >         double[] values = new double[word_vec.size()];
>> >> >>> >
>> >> >>> >         for (int i = 0; i < values.length; i++)
>> >> >>> >
>> >> >>> >           values[i] = word_vec.get(i).getAsInt();
>> >> >>> >
>> >> >>> >         return new VectorUrl(Vectors.dense(values), url);
>> >> >>> >
>> >> >>> >       }
>> >> >>> >
>> >> >>> >     });
>> >> >>> >
>> >> >>> >
>> >> >>> >
>> >> >>> >     // Index documents with unique IDs
>> >> >>> >
>> >> >>> >     JavaPairRDD<Long, VectorUrl> id2doc =
>> >> >>> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
>> >> >>> >
>> >> >>> >         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long,
>> >> >>> > VectorUrl>>()
>> >> >>> > {
>> >> >>> >
>> >> >>> >           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl,
>> >> >>> > Long>
>> >> >>> > doc_id) {
>> >> >>> >
>> >> >>> >             return doc_id.swap();
>> >> >>> >
>> >> >>> >           }
>> >> >>> >
>> >> >>> >         }));
>> >> >>> >
>> >> >>> >     JavaPairRDD<Long, String> id2Url =
>> >> >>> > JavaPairRDD.fromJavaRDD(id2doc
>> >> >>> >
>> >> >>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>> >> >>> > String>>() {
>> >> >>> >
>> >> >>> >           @Override
>> >> >>> >
>> >> >>> >           public Tuple2<Long, String> call(Tuple2<Long,
>> >> >>> > VectorUrl>
>> >> >>> > id2doc) throws Exception {
>> >> >>> >
>> >> >>> >             return new Tuple2(id2doc._1, id2doc._2.url);
>> >> >>> >
>> >> >>> >           }
>> >> >>> >
>> >> >>> >         }));
>> >> >>> >
>> >> >>> >     id2Url.saveAsTextFile(id2UrlPath);
>> >> >>> >
>> >> >>> >     JavaPairRDD<Long, Vector> corpus =
>> >> >>> > JavaPairRDD.fromJavaRDD(id2doc
>> >> >>> >
>> >> >>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>> >> >>> > Vector>>() {
>> >> >>> >
>> >> >>> >           @Override
>> >> >>> >
>> >> >>> >           public Tuple2<Long, Vector> call(Tuple2<Long,
>> >> >>> > VectorUrl>
>> >> >>> > id2doc) throws Exception {
>> >> >>> >
>> >> >>> >             return new Tuple2(id2doc._1, id2doc._2.vec);
>> >> >>> >
>> >> >>> >           }
>> >> >>> >
>> >> >>> >         }));
>> >> >>> >
>> >> >>> >     corpus.cache();
>> >> >>> >
>> >> >>> >
>> >> >>> >     // Cluster the documents into three topics using LDA
>> >> >>> >
>> >> >>> >     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>> >> >>> > LDA().setMaxIterations(iterNumber)
>> >> >>> >
>> >> >>> >         .setK(topicNumber).run(corpus);
>> >> >>> >
>> >> >>> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com>
>> >> >>> > wrote:
>> >> >>> >> I will use a portion of data and try. will the hdfs block affect
>> >> >>> >> spark?(if so, it's hard to reproduce)
>> >> >>> >>
>> >> >>> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley
>> >> >>> >> <jo...@databricks.com>
>> >> >>> >> wrote:
>> >> >>> >>> Hi Li,
>> >> >>> >>>
>> >> >>> >>> I'm wondering if you're running into the same bug reported
>> >> >>> >>> here:
>> >> >>> >>> https://issues.apache.org/jira/browse/SPARK-12488
>> >> >>> >>>
>> >> >>> >>> I haven't figured out yet what is causing it.  Do you have a
>> >> >>> >>> small
>> >> >>> >>> corpus
>> >> >>> >>> which reproduces this error, and which you can share on the
>> >> >>> >>> JIRA?
>> >> >>> >>> If
>> >> >>> >>> so,
>> >> >>> >>> that would help a lot in debugging this failure.
>> >> >>> >>>
>> >> >>> >>> Thanks!
>> >> >>> >>> Joseph
>> >> >>> >>>
>> >> >>> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com>
>> >> >>> >>> wrote:
>> >> >>> >>>>
>> >> >>> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
>> >> >>> >>>> it throws exception in line:   Matrix topics =
>> >> >>> >>>> ldaModel.topicsMatrix();
>> >> >>> >>>> But in yarn job history ui, it's successful. What's wrong with
>> >> >>> >>>> it?
>> >> >>> >>>> I submit job with
>> >> >>> >>>> .bin/spark-submit --class Myclass \
>> >> >>> >>>>     --master yarn-client \
>> >> >>> >>>>     --num-executors 2 \
>> >> >>> >>>>     --driver-memory 4g \
>> >> >>> >>>>     --executor-memory 4g \
>> >> >>> >>>>     --executor-cores 1 \
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> My codes:
>> >> >>> >>>>
>> >> >>> >>>>    corpus.cache();
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>     // Cluster the documents into three topics using LDA
>> >> >>> >>>>
>> >> >>> >>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>     // Output topics. Each is a distribution over words
>> >> >>> >>>> (matching
>> >> >>> >>>> word
>> >> >>> >>>> count vectors)
>> >> >>> >>>>
>> >> >>> >>>>     System.out.println("Learned topics (as distributions over
>> >> >>> >>>> vocab
>> >> >>> >>>> of
>> >> >>> >>>> " + ldaModel.vocabSize()
>> >> >>> >>>>
>> >> >>> >>>>         + " words):");
>> >> >>> >>>>
>> >> >>> >>>>    //Line81, exception here:    Matrix topics =
>> >> >>> >>>> ldaModel.topicsMatrix();
>> >> >>> >>>>
>> >> >>> >>>>     for (int topic = 0; topic < topicNumber; topic++) {
>> >> >>> >>>>
>> >> >>> >>>>       System.out.print("Topic " + topic + ":");
>> >> >>> >>>>
>> >> >>> >>>>       for (int word = 0; word < ldaModel.vocabSize(); word++)
>> >> >>> >>>> {
>> >> >>> >>>>
>> >> >>> >>>>         System.out.print(" " + topics.apply(word, topic));
>> >> >>> >>>>
>> >> >>> >>>>       }
>> >> >>> >>>>
>> >> >>> >>>>       System.out.println();
>> >> >>> >>>>
>> >> >>> >>>>     }
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>     ldaModel.save(sc.sc(), modelPath);
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> Exception in thread "main"
>> >> >>> >>>> java.lang.IndexOutOfBoundsException:
>> >> >>> >>>> (1025,0) not in [-58,58) x [-100,100)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>> >> >>> >>>>
>> >> >>> >>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>> >> >>> >>>> Method)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> >>> >>>>
>> >> >>> >>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> >> >>> >>>>
>> >> >>> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop()
>> >> >>> >>>> from
>> >> >>> >>>> shutdown
>> >> >>> >>>> hook
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> ---------------------------------------------------------------------
>> >> >>> >>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> >> >>> >>>> For additional commands, e-mail: dev-help@spark.apache.org
>> >> >>> >>>>
>> >> >>> >>>
>> >> >>>
>> >> >>>
>> >> >>> ---------------------------------------------------------------------
>> >> >>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> >> >>> For additional commands, e-mail: dev-help@spark.apache.org
>> >> >>>
>> >> >>
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: running lda in spark throws exception

Posted by Joseph Bradley <jo...@databricks.com>.
It's possible this was caused by incorrect Graph creation, fixed in
[SPARK-13355].

Could you retry your dataset using the current master to see if the problem
is fixed?  Thanks!

On Tue, Jan 19, 2016 at 5:31 AM, Li Li <fa...@gmail.com> wrote:

> I have modified my codes. I can get the total vocabulary size and
> index array and freq array from the jsonobject.
>
>         JsonArray idxArr = jo.get("idxArr").getAsJsonArray();
>
>         JsonArray freqArr=jo.get("freqArr").getAsJsonArray();
>
>         int total=jo.get("vocabSize").getAsInt();
>
>         String url = jo.get("webpageUrl").getAsString();
>
>         int[] idx = new int[idxArr.size()];
>
>         double[] freq=new double[freqArr.size()];
>
>         for(int i=0;i<idxArr.size();i++){
>
>           idx[i]=idxArr.get(i).getAsInt();
>
>           freq[i]=freqArr.get(i).getAsDouble();
>
>         }
>
>
>
>         return new VectorUrl(Vectors.sparse(total, idx, freq), url);
>
> But when I run it, it throws exception Job aborted due to stage
> failure: Total size of serialized results of 22 tasks (3.1 GB) is
> bigger than spark.driver.maxResultSize (3.0 GB)
>        I have set result to 3g but it still says not engouh.
>        conf.set("spark.driver.maxResultSize", "3g");
>        How much memory will it use?
> I use the following script to submit job to yarn cluster.
>
> bin/spark-submit --class xxx.yyyy.ReviewLDA \
>     --master yarn-cluster \
>     --num-executors 10 \
>     --driver-memory 4g \
>     --executor-memory 4g \
>     --executor-cores 2
>
> On Fri, Jan 15, 2016 at 3:24 AM, Bryan Cutler <cu...@gmail.com> wrote:
> > What I mean is the input to LDA.run() is a RDD[(Long, Vector)] and the
> > Vector is a vector of counts of each term and should be the same size as
> the
> > vocabulary (so if the vocabulary, or dictionary has 10 words, each vector
> > should have a size of 10).  This probably means that there will be some
> > elements with zero counts, and a sparse vector might be a good way to
> handle
> > that.
> >
> > On Wed, Jan 13, 2016 at 6:40 PM, Li Li <fa...@gmail.com> wrote:
> >>
> >> It looks like the problem is the vectors of term counts in the corpus
> >> are not always the vocabulary size.
> >> Do you mean some integers not occured in the corpus?
> >> for example, I have the dictionary is 0 - 9 (total 10 words).
> >> The docs are:
> >> 0 2 4 6 8
> >> 1 3 5 7 9
> >> Then it will be correct
> >> If the docs are:
> >> 0 2 4 6 9
> >> 1 3 5 6 7
> >> 8 is not occured in any document, Then it will wrong?
> >>
> >> So the workaround is to process the input to re-encode terms?
> >>
> >> On Thu, Jan 14, 2016 at 6:53 AM, Bryan Cutler <cu...@gmail.com>
> wrote:
> >> > I was now able to reproduce the exception using the master branch and
> >> > local
> >> > mode.  It looks like the problem is the vectors of term counts in the
> >> > corpus
> >> > are not always the vocabulary size.  Once I padded these with zero
> >> > counts to
> >> > the vocab size, it ran without the exception.
> >> >
> >> > Joseph, I also tried calling describeTopics and noticed that with the
> >> > improper vector size, it will not throw an exception but the term
> >> > indices
> >> > will start to be incorrect.  For a small number of iterations, it is
> ok,
> >> > but
> >> > increasing iterations causes the indices to get larger also.  Maybe
> that
> >> > is
> >> > what is going on in the JIRA you linked to?
> >> >
> >> > On Wed, Jan 13, 2016 at 1:17 AM, Li Li <fa...@gmail.com> wrote:
> >> >>
> >> >> I will try spark 1.6.0 to see it is the bug of 1.5.2.
> >> >>
> >> >> On Wed, Jan 13, 2016 at 3:58 PM, Li Li <fa...@gmail.com> wrote:
> >> >> > I have set up a stand alone spark cluster and use the same codes.
> it
> >> >> > still failed with the same exception
> >> >> > I also preprocessed the data to lines of integers and use the scala
> >> >> > codes of lda example. it still failed.
> >> >> > the codes:
> >> >> >
> >> >> > import org.apache.spark.mllib.clustering.{ LDA,
> DistributedLDAModel }
> >> >> >
> >> >> > import org.apache.spark.mllib.linalg.Vectors
> >> >> >
> >> >> > import org.apache.spark.SparkContext
> >> >> >
> >> >> > import org.apache.spark.SparkContext._
> >> >> >
> >> >> > import org.apache.spark.SparkConf
> >> >> >
> >> >> >
> >> >> > object TestLDA {
> >> >> >
> >> >> >   def main(args: Array[String]) {
> >> >> >
> >> >> >     if(args.length!=4){
> >> >> >
> >> >> >       println("need 4 args inDir outDir topic iternum")
> >> >> >
> >> >> >       System.exit(-1)
> >> >> >
> >> >> >     }
> >> >> >
> >> >> >     val conf = new SparkConf().setAppName("TestLDA")
> >> >> >
> >> >> >     val sc = new SparkContext(conf)
> >> >> >
> >> >> >     // Load and parse the data
> >> >> >
> >> >> >     val data = sc.textFile(args(0))
> >> >> >
> >> >> >     val parsedData = data.map(s => Vectors.dense(s.trim.split('
> >> >> > ').map(_.toDouble)))
> >> >> >
> >> >> >     // Index documents with unique IDs
> >> >> >
> >> >> >     val corpus = parsedData.zipWithIndex.map(_.swap).cache()
> >> >> >
> >> >> >     val topicNum=Integer.valueOf(args(2))
> >> >> >
> >> >> >     val iterNum=Integer.valueOf(args(1))
> >> >> >
> >> >> >     // Cluster the documents into three topics using LDA
> >> >> >
> >> >> >     val ldaModel = new
> >> >> > LDA().setK(topicNum).setMaxIterations(iterNum).run(corpus)
> >> >> >
> >> >> >
> >> >> >     // Output topics. Each is a distribution over words (matching
> >> >> > word
> >> >> > count vectors)
> >> >> >
> >> >> >     println("Learned topics (as distributions over vocab of " +
> >> >> > ldaModel.vocabSize + " words):")
> >> >> >
> >> >> >     val topics = ldaModel.topicsMatrix
> >> >> >
> >> >> >     for (topic <- Range(0, topicNum)) {
> >> >> >
> >> >> >       print("Topic " + topic + ":")
> >> >> >
> >> >> >       for (word <- Range(0, ldaModel.vocabSize)) { print(" " +
> >> >> > topics(word, topic)); }
> >> >> >
> >> >> >       println()
> >> >> >
> >> >> >     }
> >> >> >
> >> >> >
> >> >> >     // Save and load model.
> >> >> >
> >> >> >     ldaModel.save(sc, args(1))
> >> >> >
> >> >> >   }
> >> >> >
> >> >> >
> >> >> > }
> >> >> >
> >> >> > scripts to submit:
> >> >> >
> >> >> > ~/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class
> >> >> > com.mobvoi.knowledgegraph.scala_test.TestLDA \
> >> >> >
> >> >> >     --master spark://master:7077 \
> >> >> >
> >> >> >     --num-executors 10 \
> >> >> >
> >> >> >     --executor-memory 4g \
> >> >> >
> >> >> >     --executor-cores 3 \
> >> >> >
> >> >> >     scala_test-1.0-jar-with-dependencies.jar \
> >> >> >
> >> >> >     /test.txt \
> >> >> >
> >> >> >     100 \
> >> >> >
> >> >> >     5  \
> >> >> >
> >> >> >     /lda_model
> >> >> >
> >> >> > test.txt is in attachment
> >> >> >
> >> >> >
> >> >> > On Sat, Jan 9, 2016 at 6:21 AM, Bryan Cutler <cu...@gmail.com>
> >> >> > wrote:
> >> >> >> Hi Li,
> >> >> >>
> >> >> >> I tried out your code and sample data in both local mode and Spark
> >> >> >> Standalone and it ran correctly with output that looks good.
> Sorry,
> >> >> >> I
> >> >> >> don't
> >> >> >> have a YARN cluster setup right now, so maybe the error you are
> >> >> >> seeing
> >> >> >> is
> >> >> >> specific to that.  Btw, I am running the latest Spark code from
> the
> >> >> >> master
> >> >> >> branch.  Hope that helps some!
> >> >> >>
> >> >> >> Bryan
> >> >> >>
> >> >> >> On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fa...@gmail.com>
> wrote:
> >> >> >>>
> >> >> >>> anyone could help? the problem is very easy to reproduce. What's
> >> >> >>> wrong?
> >> >> >>>
> >> >> >>> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fa...@gmail.com>
> wrote:
> >> >> >>> > I use a small data and reproduce the problem.
> >> >> >>> > But I don't know my codes are correct or not because I am not
> >> >> >>> > familiar
> >> >> >>> > with spark.
> >> >> >>> > So I first post my codes here. If it's correct, then I will
> post
> >> >> >>> > the
> >> >> >>> > data.
> >> >> >>> > one line of my data like:
> >> >> >>> >
> >> >> >>> > { "time":"08-09-17","cmtUrl":"2094361"
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> >> >> >>> > ,"rvId":"rev_10000020","webpageUrl":"
> http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
> >> >> >>> >
> >> >> >>> > it's a json file which contains webpageUrl and word_vec which
> is
> >> >> >>> > the
> >> >> >>> > encoded words.
> >> >> >>> > The first step is to prase the input rdd to a rdd of VectorUrl.
> >> >> >>> > BTW, if public VectorUrl call(String s) return null, is it ok?
> >> >> >>> > Then follow the example Index documents with unique IDs
> >> >> >>> > Then I create a rdd to map id to url so after lda training, I
> can
> >> >> >>> > find
> >> >> >>> > the url of the document. Then save this rdd to hdfs.
> >> >> >>> > Then create corpus rdd and train
> >> >> >>> >
> >> >> >>> > The exception stack is
> >> >> >>> >
> >> >> >>> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class
> threw
> >> >> >>> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in
> >> >> >>> > [-58,58) x [-100,100)
> >> >> >>> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x
> >> >> >>> > [-100,100)
> >> >> >>> > at
> >> >> >>> >
> >> >> >>> >
> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
> >> >> >>> > at
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
> >> >> >>> > at
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
> >> >> >>> > at
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >> >> >>> > at
> >> >> >>> >
> >> >> >>> >
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >> >> >>> > at
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
> >> >> >>> > at
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
> >> >> >>> > at
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
> >> >> >>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> >> >>> > at
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >> >> >>> > at
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >> >>> > at java.lang.reflect.Method.invoke(Method.java:606)
> >> >> >>> > at
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
> >> >> >>> >
> >> >> >>> >
> >> >> >>> > ==========here is my codes==============
> >> >> >>> >
> >> >> >>> > SparkConf conf = new
> >> >> >>> > SparkConf().setAppName(ReviewLDA.class.getName());
> >> >> >>> >
> >> >> >>> >     JavaSparkContext sc = new JavaSparkContext(conf);
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >     // Load and parse the data
> >> >> >>> >
> >> >> >>> >     JavaRDD<String> data = sc.textFile(inputDir + "/*");
> >> >> >>> >
> >> >> >>> >     JavaRDD<VectorUrl> parsedData = data.map(new
> Function<String,
> >> >> >>> > VectorUrl>() {
> >> >> >>> >
> >> >> >>> >       public VectorUrl call(String s) {
> >> >> >>> >
> >> >> >>> >         JsonParser parser = new JsonParser();
> >> >> >>> >
> >> >> >>> >         JsonObject jo = parser.parse(s).getAsJsonObject();
> >> >> >>> >
> >> >> >>> >         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
> >> >> >>> >
> >> >> >>> >           return null;
> >> >> >>> >
> >> >> >>> >         }
> >> >> >>> >
> >> >> >>> >         JsonArray word_vec =
> jo.get("word_vec").getAsJsonArray();
> >> >> >>> >
> >> >> >>> >         String url = jo.get("webpageUrl").getAsString();
> >> >> >>> >
> >> >> >>> >         double[] values = new double[word_vec.size()];
> >> >> >>> >
> >> >> >>> >         for (int i = 0; i < values.length; i++)
> >> >> >>> >
> >> >> >>> >           values[i] = word_vec.get(i).getAsInt();
> >> >> >>> >
> >> >> >>> >         return new VectorUrl(Vectors.dense(values), url);
> >> >> >>> >
> >> >> >>> >       }
> >> >> >>> >
> >> >> >>> >     });
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >     // Index documents with unique IDs
> >> >> >>> >
> >> >> >>> >     JavaPairRDD<Long, VectorUrl> id2doc =
> >> >> >>> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
> >> >> >>> >
> >> >> >>> >         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long,
> >> >> >>> > VectorUrl>>()
> >> >> >>> > {
> >> >> >>> >
> >> >> >>> >           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl,
> >> >> >>> > Long>
> >> >> >>> > doc_id) {
> >> >> >>> >
> >> >> >>> >             return doc_id.swap();
> >> >> >>> >
> >> >> >>> >           }
> >> >> >>> >
> >> >> >>> >         }));
> >> >> >>> >
> >> >> >>> >     JavaPairRDD<Long, String> id2Url =
> >> >> >>> > JavaPairRDD.fromJavaRDD(id2doc
> >> >> >>> >
> >> >> >>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
> >> >> >>> > String>>() {
> >> >> >>> >
> >> >> >>> >           @Override
> >> >> >>> >
> >> >> >>> >           public Tuple2<Long, String> call(Tuple2<Long,
> >> >> >>> > VectorUrl>
> >> >> >>> > id2doc) throws Exception {
> >> >> >>> >
> >> >> >>> >             return new Tuple2(id2doc._1, id2doc._2.url);
> >> >> >>> >
> >> >> >>> >           }
> >> >> >>> >
> >> >> >>> >         }));
> >> >> >>> >
> >> >> >>> >     id2Url.saveAsTextFile(id2UrlPath);
> >> >> >>> >
> >> >> >>> >     JavaPairRDD<Long, Vector> corpus =
> >> >> >>> > JavaPairRDD.fromJavaRDD(id2doc
> >> >> >>> >
> >> >> >>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
> >> >> >>> > Vector>>() {
> >> >> >>> >
> >> >> >>> >           @Override
> >> >> >>> >
> >> >> >>> >           public Tuple2<Long, Vector> call(Tuple2<Long,
> >> >> >>> > VectorUrl>
> >> >> >>> > id2doc) throws Exception {
> >> >> >>> >
> >> >> >>> >             return new Tuple2(id2doc._1, id2doc._2.vec);
> >> >> >>> >
> >> >> >>> >           }
> >> >> >>> >
> >> >> >>> >         }));
> >> >> >>> >
> >> >> >>> >     corpus.cache();
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >     // Cluster the documents into three topics using LDA
> >> >> >>> >
> >> >> >>> >     DistributedLDAModel ldaModel = (DistributedLDAModel) new
> >> >> >>> > LDA().setMaxIterations(iterNumber)
> >> >> >>> >
> >> >> >>> >         .setK(topicNumber).run(corpus);
> >> >> >>> >
> >> >> >>> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com>
> >> >> >>> > wrote:
> >> >> >>> >> I will use a portion of data and try. will the hdfs block
> affect
> >> >> >>> >> spark?(if so, it's hard to reproduce)
> >> >> >>> >>
> >> >> >>> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley
> >> >> >>> >> <jo...@databricks.com>
> >> >> >>> >> wrote:
> >> >> >>> >>> Hi Li,
> >> >> >>> >>>
> >> >> >>> >>> I'm wondering if you're running into the same bug reported
> >> >> >>> >>> here:
> >> >> >>> >>> https://issues.apache.org/jira/browse/SPARK-12488
> >> >> >>> >>>
> >> >> >>> >>> I haven't figured out yet what is causing it.  Do you have a
> >> >> >>> >>> small
> >> >> >>> >>> corpus
> >> >> >>> >>> which reproduces this error, and which you can share on the
> >> >> >>> >>> JIRA?
> >> >> >>> >>> If
> >> >> >>> >>> so,
> >> >> >>> >>> that would help a lot in debugging this failure.
> >> >> >>> >>>
> >> >> >>> >>> Thanks!
> >> >> >>> >>> Joseph
> >> >> >>> >>>
> >> >> >>> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com>
> >> >> >>> >>> wrote:
> >> >> >>> >>>>
> >> >> >>> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark
> 1.5.2.
> >> >> >>> >>>> it throws exception in line:   Matrix topics =
> >> >> >>> >>>> ldaModel.topicsMatrix();
> >> >> >>> >>>> But in yarn job history ui, it's successful. What's wrong
> with
> >> >> >>> >>>> it?
> >> >> >>> >>>> I submit job with
> >> >> >>> >>>> .bin/spark-submit --class Myclass \
> >> >> >>> >>>>     --master yarn-client \
> >> >> >>> >>>>     --num-executors 2 \
> >> >> >>> >>>>     --driver-memory 4g \
> >> >> >>> >>>>     --executor-memory 4g \
> >> >> >>> >>>>     --executor-cores 1 \
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>> My codes:
> >> >> >>> >>>>
> >> >> >>> >>>>    corpus.cache();
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>     // Cluster the documents into three topics using LDA
> >> >> >>> >>>>
> >> >> >>> >>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>     // Output topics. Each is a distribution over words
> >> >> >>> >>>> (matching
> >> >> >>> >>>> word
> >> >> >>> >>>> count vectors)
> >> >> >>> >>>>
> >> >> >>> >>>>     System.out.println("Learned topics (as distributions
> over
> >> >> >>> >>>> vocab
> >> >> >>> >>>> of
> >> >> >>> >>>> " + ldaModel.vocabSize()
> >> >> >>> >>>>
> >> >> >>> >>>>         + " words):");
> >> >> >>> >>>>
> >> >> >>> >>>>    //Line81, exception here:    Matrix topics =
> >> >> >>> >>>> ldaModel.topicsMatrix();
> >> >> >>> >>>>
> >> >> >>> >>>>     for (int topic = 0; topic < topicNumber; topic++) {
> >> >> >>> >>>>
> >> >> >>> >>>>       System.out.print("Topic " + topic + ":");
> >> >> >>> >>>>
> >> >> >>> >>>>       for (int word = 0; word < ldaModel.vocabSize();
> word++)
> >> >> >>> >>>> {
> >> >> >>> >>>>
> >> >> >>> >>>>         System.out.print(" " + topics.apply(word, topic));
> >> >> >>> >>>>
> >> >> >>> >>>>       }
> >> >> >>> >>>>
> >> >> >>> >>>>       System.out.println();
> >> >> >>> >>>>
> >> >> >>> >>>>     }
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>     ldaModel.save(sc.sc(), modelPath);
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>> Exception in thread "main"
> >> >> >>> >>>> java.lang.IndexOutOfBoundsException:
> >> >> >>> >>>> (1025,0) not in [-58,58) x [-100,100)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> >> >> >>> >>>> Method)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >> >>> >>>>
> >> >> >>> >>>>         at java.lang.reflect.Method.invoke(Method.java:606)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>>
> >> >> >>> >>>>
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> >> >> >>> >>>>
> >> >> >>> >>>>         at
> >> >> >>> >>>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> >> >> >>> >>>>
> >> >> >>> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop()
> >> >> >>> >>>> from
> >> >> >>> >>>> shutdown
> >> >> >>> >>>> hook
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> >> >> >>> >>>>
> ---------------------------------------------------------------------
> >> >> >>> >>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> >> >> >>> >>>> For additional commands, e-mail: dev-help@spark.apache.org
> >> >> >>> >>>>
> >> >> >>> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> ---------------------------------------------------------------------
> >> >> >>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> >> >> >>> For additional commands, e-mail: dev-help@spark.apache.org
> >> >> >>>
> >> >> >>
> >> >
> >> >
> >
> >
>

Re: running lda in spark throws exception

Posted by Li Li <fa...@gmail.com>.
I have modified my codes. I can get the total vocabulary size and
index array and freq array from the jsonobject.

        JsonArray idxArr = jo.get("idxArr").getAsJsonArray();

        JsonArray freqArr=jo.get("freqArr").getAsJsonArray();

        int total=jo.get("vocabSize").getAsInt();

        String url = jo.get("webpageUrl").getAsString();

        int[] idx = new int[idxArr.size()];

        double[] freq=new double[freqArr.size()];

        for(int i=0;i<idxArr.size();i++){

          idx[i]=idxArr.get(i).getAsInt();

          freq[i]=freqArr.get(i).getAsDouble();

        }



        return new VectorUrl(Vectors.sparse(total, idx, freq), url);

But when I run it, it throws exception Job aborted due to stage
failure: Total size of serialized results of 22 tasks (3.1 GB) is
bigger than spark.driver.maxResultSize (3.0 GB)
       I have set result to 3g but it still says not engouh.
       conf.set("spark.driver.maxResultSize", "3g");
       How much memory will it use?
I use the following script to submit job to yarn cluster.

bin/spark-submit --class xxx.yyyy.ReviewLDA \
    --master yarn-cluster \
    --num-executors 10 \
    --driver-memory 4g \
    --executor-memory 4g \
    --executor-cores 2

On Fri, Jan 15, 2016 at 3:24 AM, Bryan Cutler <cu...@gmail.com> wrote:
> What I mean is the input to LDA.run() is a RDD[(Long, Vector)] and the
> Vector is a vector of counts of each term and should be the same size as the
> vocabulary (so if the vocabulary, or dictionary has 10 words, each vector
> should have a size of 10).  This probably means that there will be some
> elements with zero counts, and a sparse vector might be a good way to handle
> that.
>
> On Wed, Jan 13, 2016 at 6:40 PM, Li Li <fa...@gmail.com> wrote:
>>
>> It looks like the problem is the vectors of term counts in the corpus
>> are not always the vocabulary size.
>> Do you mean some integers not occured in the corpus?
>> for example, I have the dictionary is 0 - 9 (total 10 words).
>> The docs are:
>> 0 2 4 6 8
>> 1 3 5 7 9
>> Then it will be correct
>> If the docs are:
>> 0 2 4 6 9
>> 1 3 5 6 7
>> 8 is not occured in any document, Then it will wrong?
>>
>> So the workaround is to process the input to re-encode terms?
>>
>> On Thu, Jan 14, 2016 at 6:53 AM, Bryan Cutler <cu...@gmail.com> wrote:
>> > I was now able to reproduce the exception using the master branch and
>> > local
>> > mode.  It looks like the problem is the vectors of term counts in the
>> > corpus
>> > are not always the vocabulary size.  Once I padded these with zero
>> > counts to
>> > the vocab size, it ran without the exception.
>> >
>> > Joseph, I also tried calling describeTopics and noticed that with the
>> > improper vector size, it will not throw an exception but the term
>> > indices
>> > will start to be incorrect.  For a small number of iterations, it is ok,
>> > but
>> > increasing iterations causes the indices to get larger also.  Maybe that
>> > is
>> > what is going on in the JIRA you linked to?
>> >
>> > On Wed, Jan 13, 2016 at 1:17 AM, Li Li <fa...@gmail.com> wrote:
>> >>
>> >> I will try spark 1.6.0 to see it is the bug of 1.5.2.
>> >>
>> >> On Wed, Jan 13, 2016 at 3:58 PM, Li Li <fa...@gmail.com> wrote:
>> >> > I have set up a stand alone spark cluster and use the same codes. it
>> >> > still failed with the same exception
>> >> > I also preprocessed the data to lines of integers and use the scala
>> >> > codes of lda example. it still failed.
>> >> > the codes:
>> >> >
>> >> > import org.apache.spark.mllib.clustering.{ LDA, DistributedLDAModel }
>> >> >
>> >> > import org.apache.spark.mllib.linalg.Vectors
>> >> >
>> >> > import org.apache.spark.SparkContext
>> >> >
>> >> > import org.apache.spark.SparkContext._
>> >> >
>> >> > import org.apache.spark.SparkConf
>> >> >
>> >> >
>> >> > object TestLDA {
>> >> >
>> >> >   def main(args: Array[String]) {
>> >> >
>> >> >     if(args.length!=4){
>> >> >
>> >> >       println("need 4 args inDir outDir topic iternum")
>> >> >
>> >> >       System.exit(-1)
>> >> >
>> >> >     }
>> >> >
>> >> >     val conf = new SparkConf().setAppName("TestLDA")
>> >> >
>> >> >     val sc = new SparkContext(conf)
>> >> >
>> >> >     // Load and parse the data
>> >> >
>> >> >     val data = sc.textFile(args(0))
>> >> >
>> >> >     val parsedData = data.map(s => Vectors.dense(s.trim.split('
>> >> > ').map(_.toDouble)))
>> >> >
>> >> >     // Index documents with unique IDs
>> >> >
>> >> >     val corpus = parsedData.zipWithIndex.map(_.swap).cache()
>> >> >
>> >> >     val topicNum=Integer.valueOf(args(2))
>> >> >
>> >> >     val iterNum=Integer.valueOf(args(1))
>> >> >
>> >> >     // Cluster the documents into three topics using LDA
>> >> >
>> >> >     val ldaModel = new
>> >> > LDA().setK(topicNum).setMaxIterations(iterNum).run(corpus)
>> >> >
>> >> >
>> >> >     // Output topics. Each is a distribution over words (matching
>> >> > word
>> >> > count vectors)
>> >> >
>> >> >     println("Learned topics (as distributions over vocab of " +
>> >> > ldaModel.vocabSize + " words):")
>> >> >
>> >> >     val topics = ldaModel.topicsMatrix
>> >> >
>> >> >     for (topic <- Range(0, topicNum)) {
>> >> >
>> >> >       print("Topic " + topic + ":")
>> >> >
>> >> >       for (word <- Range(0, ldaModel.vocabSize)) { print(" " +
>> >> > topics(word, topic)); }
>> >> >
>> >> >       println()
>> >> >
>> >> >     }
>> >> >
>> >> >
>> >> >     // Save and load model.
>> >> >
>> >> >     ldaModel.save(sc, args(1))
>> >> >
>> >> >   }
>> >> >
>> >> >
>> >> > }
>> >> >
>> >> > scripts to submit:
>> >> >
>> >> > ~/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class
>> >> > com.mobvoi.knowledgegraph.scala_test.TestLDA \
>> >> >
>> >> >     --master spark://master:7077 \
>> >> >
>> >> >     --num-executors 10 \
>> >> >
>> >> >     --executor-memory 4g \
>> >> >
>> >> >     --executor-cores 3 \
>> >> >
>> >> >     scala_test-1.0-jar-with-dependencies.jar \
>> >> >
>> >> >     /test.txt \
>> >> >
>> >> >     100 \
>> >> >
>> >> >     5  \
>> >> >
>> >> >     /lda_model
>> >> >
>> >> > test.txt is in attachment
>> >> >
>> >> >
>> >> > On Sat, Jan 9, 2016 at 6:21 AM, Bryan Cutler <cu...@gmail.com>
>> >> > wrote:
>> >> >> Hi Li,
>> >> >>
>> >> >> I tried out your code and sample data in both local mode and Spark
>> >> >> Standalone and it ran correctly with output that looks good.  Sorry,
>> >> >> I
>> >> >> don't
>> >> >> have a YARN cluster setup right now, so maybe the error you are
>> >> >> seeing
>> >> >> is
>> >> >> specific to that.  Btw, I am running the latest Spark code from the
>> >> >> master
>> >> >> branch.  Hope that helps some!
>> >> >>
>> >> >> Bryan
>> >> >>
>> >> >> On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fa...@gmail.com> wrote:
>> >> >>>
>> >> >>> anyone could help? the problem is very easy to reproduce. What's
>> >> >>> wrong?
>> >> >>>
>> >> >>> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fa...@gmail.com> wrote:
>> >> >>> > I use a small data and reproduce the problem.
>> >> >>> > But I don't know my codes are correct or not because I am not
>> >> >>> > familiar
>> >> >>> > with spark.
>> >> >>> > So I first post my codes here. If it's correct, then I will post
>> >> >>> > the
>> >> >>> > data.
>> >> >>> > one line of my data like:
>> >> >>> >
>> >> >>> > { "time":"08-09-17","cmtUrl":"2094361"
>> >> >>> >
>> >> >>> >
>> >> >>> >
>> >> >>> > ,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
>> >> >>> >
>> >> >>> >
>> >> >>> >
>> >> >>> > ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
>> >> >>> >
>> >> >>> > it's a json file which contains webpageUrl and word_vec which is
>> >> >>> > the
>> >> >>> > encoded words.
>> >> >>> > The first step is to prase the input rdd to a rdd of VectorUrl.
>> >> >>> > BTW, if public VectorUrl call(String s) return null, is it ok?
>> >> >>> > Then follow the example Index documents with unique IDs
>> >> >>> > Then I create a rdd to map id to url so after lda training, I can
>> >> >>> > find
>> >> >>> > the url of the document. Then save this rdd to hdfs.
>> >> >>> > Then create corpus rdd and train
>> >> >>> >
>> >> >>> > The exception stack is
>> >> >>> >
>> >> >>> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
>> >> >>> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in
>> >> >>> > [-58,58) x [-100,100)
>> >> >>> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x
>> >> >>> > [-100,100)
>> >> >>> > at
>> >> >>> >
>> >> >>> > breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >> >>> > at
>> >> >>> >
>> >> >>> > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
>> >> >>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> >>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> >> >>> > at
>> >> >>> >
>> >> >>> >
>> >> >>> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
>> >> >>> >
>> >> >>> >
>> >> >>> > ==========here is my codes==============
>> >> >>> >
>> >> >>> > SparkConf conf = new
>> >> >>> > SparkConf().setAppName(ReviewLDA.class.getName());
>> >> >>> >
>> >> >>> >     JavaSparkContext sc = new JavaSparkContext(conf);
>> >> >>> >
>> >> >>> >
>> >> >>> >     // Load and parse the data
>> >> >>> >
>> >> >>> >     JavaRDD<String> data = sc.textFile(inputDir + "/*");
>> >> >>> >
>> >> >>> >     JavaRDD<VectorUrl> parsedData = data.map(new Function<String,
>> >> >>> > VectorUrl>() {
>> >> >>> >
>> >> >>> >       public VectorUrl call(String s) {
>> >> >>> >
>> >> >>> >         JsonParser parser = new JsonParser();
>> >> >>> >
>> >> >>> >         JsonObject jo = parser.parse(s).getAsJsonObject();
>> >> >>> >
>> >> >>> >         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
>> >> >>> >
>> >> >>> >           return null;
>> >> >>> >
>> >> >>> >         }
>> >> >>> >
>> >> >>> >         JsonArray word_vec = jo.get("word_vec").getAsJsonArray();
>> >> >>> >
>> >> >>> >         String url = jo.get("webpageUrl").getAsString();
>> >> >>> >
>> >> >>> >         double[] values = new double[word_vec.size()];
>> >> >>> >
>> >> >>> >         for (int i = 0; i < values.length; i++)
>> >> >>> >
>> >> >>> >           values[i] = word_vec.get(i).getAsInt();
>> >> >>> >
>> >> >>> >         return new VectorUrl(Vectors.dense(values), url);
>> >> >>> >
>> >> >>> >       }
>> >> >>> >
>> >> >>> >     });
>> >> >>> >
>> >> >>> >
>> >> >>> >
>> >> >>> >     // Index documents with unique IDs
>> >> >>> >
>> >> >>> >     JavaPairRDD<Long, VectorUrl> id2doc =
>> >> >>> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
>> >> >>> >
>> >> >>> >         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long,
>> >> >>> > VectorUrl>>()
>> >> >>> > {
>> >> >>> >
>> >> >>> >           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl,
>> >> >>> > Long>
>> >> >>> > doc_id) {
>> >> >>> >
>> >> >>> >             return doc_id.swap();
>> >> >>> >
>> >> >>> >           }
>> >> >>> >
>> >> >>> >         }));
>> >> >>> >
>> >> >>> >     JavaPairRDD<Long, String> id2Url =
>> >> >>> > JavaPairRDD.fromJavaRDD(id2doc
>> >> >>> >
>> >> >>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>> >> >>> > String>>() {
>> >> >>> >
>> >> >>> >           @Override
>> >> >>> >
>> >> >>> >           public Tuple2<Long, String> call(Tuple2<Long,
>> >> >>> > VectorUrl>
>> >> >>> > id2doc) throws Exception {
>> >> >>> >
>> >> >>> >             return new Tuple2(id2doc._1, id2doc._2.url);
>> >> >>> >
>> >> >>> >           }
>> >> >>> >
>> >> >>> >         }));
>> >> >>> >
>> >> >>> >     id2Url.saveAsTextFile(id2UrlPath);
>> >> >>> >
>> >> >>> >     JavaPairRDD<Long, Vector> corpus =
>> >> >>> > JavaPairRDD.fromJavaRDD(id2doc
>> >> >>> >
>> >> >>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>> >> >>> > Vector>>() {
>> >> >>> >
>> >> >>> >           @Override
>> >> >>> >
>> >> >>> >           public Tuple2<Long, Vector> call(Tuple2<Long,
>> >> >>> > VectorUrl>
>> >> >>> > id2doc) throws Exception {
>> >> >>> >
>> >> >>> >             return new Tuple2(id2doc._1, id2doc._2.vec);
>> >> >>> >
>> >> >>> >           }
>> >> >>> >
>> >> >>> >         }));
>> >> >>> >
>> >> >>> >     corpus.cache();
>> >> >>> >
>> >> >>> >
>> >> >>> >     // Cluster the documents into three topics using LDA
>> >> >>> >
>> >> >>> >     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>> >> >>> > LDA().setMaxIterations(iterNumber)
>> >> >>> >
>> >> >>> >         .setK(topicNumber).run(corpus);
>> >> >>> >
>> >> >>> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com>
>> >> >>> > wrote:
>> >> >>> >> I will use a portion of data and try. will the hdfs block affect
>> >> >>> >> spark?(if so, it's hard to reproduce)
>> >> >>> >>
>> >> >>> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley
>> >> >>> >> <jo...@databricks.com>
>> >> >>> >> wrote:
>> >> >>> >>> Hi Li,
>> >> >>> >>>
>> >> >>> >>> I'm wondering if you're running into the same bug reported
>> >> >>> >>> here:
>> >> >>> >>> https://issues.apache.org/jira/browse/SPARK-12488
>> >> >>> >>>
>> >> >>> >>> I haven't figured out yet what is causing it.  Do you have a
>> >> >>> >>> small
>> >> >>> >>> corpus
>> >> >>> >>> which reproduces this error, and which you can share on the
>> >> >>> >>> JIRA?
>> >> >>> >>> If
>> >> >>> >>> so,
>> >> >>> >>> that would help a lot in debugging this failure.
>> >> >>> >>>
>> >> >>> >>> Thanks!
>> >> >>> >>> Joseph
>> >> >>> >>>
>> >> >>> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com>
>> >> >>> >>> wrote:
>> >> >>> >>>>
>> >> >>> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
>> >> >>> >>>> it throws exception in line:   Matrix topics =
>> >> >>> >>>> ldaModel.topicsMatrix();
>> >> >>> >>>> But in yarn job history ui, it's successful. What's wrong with
>> >> >>> >>>> it?
>> >> >>> >>>> I submit job with
>> >> >>> >>>> .bin/spark-submit --class Myclass \
>> >> >>> >>>>     --master yarn-client \
>> >> >>> >>>>     --num-executors 2 \
>> >> >>> >>>>     --driver-memory 4g \
>> >> >>> >>>>     --executor-memory 4g \
>> >> >>> >>>>     --executor-cores 1 \
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> My codes:
>> >> >>> >>>>
>> >> >>> >>>>    corpus.cache();
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>     // Cluster the documents into three topics using LDA
>> >> >>> >>>>
>> >> >>> >>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>     // Output topics. Each is a distribution over words
>> >> >>> >>>> (matching
>> >> >>> >>>> word
>> >> >>> >>>> count vectors)
>> >> >>> >>>>
>> >> >>> >>>>     System.out.println("Learned topics (as distributions over
>> >> >>> >>>> vocab
>> >> >>> >>>> of
>> >> >>> >>>> " + ldaModel.vocabSize()
>> >> >>> >>>>
>> >> >>> >>>>         + " words):");
>> >> >>> >>>>
>> >> >>> >>>>    //Line81, exception here:    Matrix topics =
>> >> >>> >>>> ldaModel.topicsMatrix();
>> >> >>> >>>>
>> >> >>> >>>>     for (int topic = 0; topic < topicNumber; topic++) {
>> >> >>> >>>>
>> >> >>> >>>>       System.out.print("Topic " + topic + ":");
>> >> >>> >>>>
>> >> >>> >>>>       for (int word = 0; word < ldaModel.vocabSize(); word++)
>> >> >>> >>>> {
>> >> >>> >>>>
>> >> >>> >>>>         System.out.print(" " + topics.apply(word, topic));
>> >> >>> >>>>
>> >> >>> >>>>       }
>> >> >>> >>>>
>> >> >>> >>>>       System.out.println();
>> >> >>> >>>>
>> >> >>> >>>>     }
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>     ldaModel.save(sc.sc(), modelPath);
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> Exception in thread "main"
>> >> >>> >>>> java.lang.IndexOutOfBoundsException:
>> >> >>> >>>> (1025,0) not in [-58,58) x [-100,100)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>> >> >>> >>>>
>> >> >>> >>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>> >> >>> >>>> Method)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> >>> >>>>
>> >> >>> >>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>>
>> >> >>> >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>> >> >>> >>>>
>> >> >>> >>>>         at
>> >> >>> >>>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> >> >>> >>>>
>> >> >>> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop()
>> >> >>> >>>> from
>> >> >>> >>>> shutdown
>> >> >>> >>>> hook
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> ---------------------------------------------------------------------
>> >> >>> >>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> >> >>> >>>> For additional commands, e-mail: dev-help@spark.apache.org
>> >> >>> >>>>
>> >> >>> >>>
>> >> >>>
>> >> >>>
>> >> >>> ---------------------------------------------------------------------
>> >> >>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> >> >>> For additional commands, e-mail: dev-help@spark.apache.org
>> >> >>>
>> >> >>
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: running lda in spark throws exception

Posted by Bryan Cutler <cu...@gmail.com>.
What I mean is the input to LDA.run() is a RDD[(Long, Vector)] and the
Vector is a vector of counts of each term and should be the same size as
the vocabulary (so if the vocabulary, or dictionary has 10 words, each
vector should have a size of 10).  This probably means that there will be
some elements with zero counts, and a sparse vector might be a good way to
handle that.

On Wed, Jan 13, 2016 at 6:40 PM, Li Li <fa...@gmail.com> wrote:

> It looks like the problem is the vectors of term counts in the corpus
> are not always the vocabulary size.
> Do you mean some integers not occured in the corpus?
> for example, I have the dictionary is 0 - 9 (total 10 words).
> The docs are:
> 0 2 4 6 8
> 1 3 5 7 9
> Then it will be correct
> If the docs are:
> 0 2 4 6 9
> 1 3 5 6 7
> 8 is not occured in any document, Then it will wrong?
>
> So the workaround is to process the input to re-encode terms?
>
> On Thu, Jan 14, 2016 at 6:53 AM, Bryan Cutler <cu...@gmail.com> wrote:
> > I was now able to reproduce the exception using the master branch and
> local
> > mode.  It looks like the problem is the vectors of term counts in the
> corpus
> > are not always the vocabulary size.  Once I padded these with zero
> counts to
> > the vocab size, it ran without the exception.
> >
> > Joseph, I also tried calling describeTopics and noticed that with the
> > improper vector size, it will not throw an exception but the term indices
> > will start to be incorrect.  For a small number of iterations, it is ok,
> but
> > increasing iterations causes the indices to get larger also.  Maybe that
> is
> > what is going on in the JIRA you linked to?
> >
> > On Wed, Jan 13, 2016 at 1:17 AM, Li Li <fa...@gmail.com> wrote:
> >>
> >> I will try spark 1.6.0 to see it is the bug of 1.5.2.
> >>
> >> On Wed, Jan 13, 2016 at 3:58 PM, Li Li <fa...@gmail.com> wrote:
> >> > I have set up a stand alone spark cluster and use the same codes. it
> >> > still failed with the same exception
> >> > I also preprocessed the data to lines of integers and use the scala
> >> > codes of lda example. it still failed.
> >> > the codes:
> >> >
> >> > import org.apache.spark.mllib.clustering.{ LDA, DistributedLDAModel }
> >> >
> >> > import org.apache.spark.mllib.linalg.Vectors
> >> >
> >> > import org.apache.spark.SparkContext
> >> >
> >> > import org.apache.spark.SparkContext._
> >> >
> >> > import org.apache.spark.SparkConf
> >> >
> >> >
> >> > object TestLDA {
> >> >
> >> >   def main(args: Array[String]) {
> >> >
> >> >     if(args.length!=4){
> >> >
> >> >       println("need 4 args inDir outDir topic iternum")
> >> >
> >> >       System.exit(-1)
> >> >
> >> >     }
> >> >
> >> >     val conf = new SparkConf().setAppName("TestLDA")
> >> >
> >> >     val sc = new SparkContext(conf)
> >> >
> >> >     // Load and parse the data
> >> >
> >> >     val data = sc.textFile(args(0))
> >> >
> >> >     val parsedData = data.map(s => Vectors.dense(s.trim.split('
> >> > ').map(_.toDouble)))
> >> >
> >> >     // Index documents with unique IDs
> >> >
> >> >     val corpus = parsedData.zipWithIndex.map(_.swap).cache()
> >> >
> >> >     val topicNum=Integer.valueOf(args(2))
> >> >
> >> >     val iterNum=Integer.valueOf(args(1))
> >> >
> >> >     // Cluster the documents into three topics using LDA
> >> >
> >> >     val ldaModel = new
> >> > LDA().setK(topicNum).setMaxIterations(iterNum).run(corpus)
> >> >
> >> >
> >> >     // Output topics. Each is a distribution over words (matching word
> >> > count vectors)
> >> >
> >> >     println("Learned topics (as distributions over vocab of " +
> >> > ldaModel.vocabSize + " words):")
> >> >
> >> >     val topics = ldaModel.topicsMatrix
> >> >
> >> >     for (topic <- Range(0, topicNum)) {
> >> >
> >> >       print("Topic " + topic + ":")
> >> >
> >> >       for (word <- Range(0, ldaModel.vocabSize)) { print(" " +
> >> > topics(word, topic)); }
> >> >
> >> >       println()
> >> >
> >> >     }
> >> >
> >> >
> >> >     // Save and load model.
> >> >
> >> >     ldaModel.save(sc, args(1))
> >> >
> >> >   }
> >> >
> >> >
> >> > }
> >> >
> >> > scripts to submit:
> >> >
> >> > ~/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class
> >> > com.mobvoi.knowledgegraph.scala_test.TestLDA \
> >> >
> >> >     --master spark://master:7077 \
> >> >
> >> >     --num-executors 10 \
> >> >
> >> >     --executor-memory 4g \
> >> >
> >> >     --executor-cores 3 \
> >> >
> >> >     scala_test-1.0-jar-with-dependencies.jar \
> >> >
> >> >     /test.txt \
> >> >
> >> >     100 \
> >> >
> >> >     5  \
> >> >
> >> >     /lda_model
> >> >
> >> > test.txt is in attachment
> >> >
> >> >
> >> > On Sat, Jan 9, 2016 at 6:21 AM, Bryan Cutler <cu...@gmail.com>
> wrote:
> >> >> Hi Li,
> >> >>
> >> >> I tried out your code and sample data in both local mode and Spark
> >> >> Standalone and it ran correctly with output that looks good.  Sorry,
> I
> >> >> don't
> >> >> have a YARN cluster setup right now, so maybe the error you are
> seeing
> >> >> is
> >> >> specific to that.  Btw, I am running the latest Spark code from the
> >> >> master
> >> >> branch.  Hope that helps some!
> >> >>
> >> >> Bryan
> >> >>
> >> >> On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fa...@gmail.com> wrote:
> >> >>>
> >> >>> anyone could help? the problem is very easy to reproduce. What's
> >> >>> wrong?
> >> >>>
> >> >>> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fa...@gmail.com> wrote:
> >> >>> > I use a small data and reproduce the problem.
> >> >>> > But I don't know my codes are correct or not because I am not
> >> >>> > familiar
> >> >>> > with spark.
> >> >>> > So I first post my codes here. If it's correct, then I will post
> the
> >> >>> > data.
> >> >>> > one line of my data like:
> >> >>> >
> >> >>> > { "time":"08-09-17","cmtUrl":"2094361"
> >> >>> >
> >> >>> >
> >> >>> > ,"rvId":"rev_10000020","webpageUrl":"
> http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
> >> >>> >
> >> >>> >
> >> >>> >
> ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
> >> >>> >
> >> >>> > it's a json file which contains webpageUrl and word_vec which is
> the
> >> >>> > encoded words.
> >> >>> > The first step is to prase the input rdd to a rdd of VectorUrl.
> >> >>> > BTW, if public VectorUrl call(String s) return null, is it ok?
> >> >>> > Then follow the example Index documents with unique IDs
> >> >>> > Then I create a rdd to map id to url so after lda training, I can
> >> >>> > find
> >> >>> > the url of the document. Then save this rdd to hdfs.
> >> >>> > Then create corpus rdd and train
> >> >>> >
> >> >>> > The exception stack is
> >> >>> >
> >> >>> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
> >> >>> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in
> >> >>> > [-58,58) x [-100,100)
> >> >>> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x
> >> >>> > [-100,100)
> >> >>> > at
> >> >>> >
> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
> >> >>> > at
> >> >>> >
> >> >>> >
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
> >> >>> > at
> >> >>> >
> >> >>> >
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
> >> >>> > at
> >> >>> >
> >> >>> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >> >>> > at
> >> >>> >
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >> >>> > at
> >> >>> >
> >> >>> >
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
> >> >>> > at
> >> >>> >
> >> >>> >
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
> >> >>> > at
> >> >>> >
> >> >>> >
> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
> >> >>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> >>> > at
> >> >>> >
> >> >>> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >> >>> > at
> >> >>> >
> >> >>> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >>> > at java.lang.reflect.Method.invoke(Method.java:606)
> >> >>> > at
> >> >>> >
> >> >>> >
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
> >> >>> >
> >> >>> >
> >> >>> > ==========here is my codes==============
> >> >>> >
> >> >>> > SparkConf conf = new
> >> >>> > SparkConf().setAppName(ReviewLDA.class.getName());
> >> >>> >
> >> >>> >     JavaSparkContext sc = new JavaSparkContext(conf);
> >> >>> >
> >> >>> >
> >> >>> >     // Load and parse the data
> >> >>> >
> >> >>> >     JavaRDD<String> data = sc.textFile(inputDir + "/*");
> >> >>> >
> >> >>> >     JavaRDD<VectorUrl> parsedData = data.map(new Function<String,
> >> >>> > VectorUrl>() {
> >> >>> >
> >> >>> >       public VectorUrl call(String s) {
> >> >>> >
> >> >>> >         JsonParser parser = new JsonParser();
> >> >>> >
> >> >>> >         JsonObject jo = parser.parse(s).getAsJsonObject();
> >> >>> >
> >> >>> >         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
> >> >>> >
> >> >>> >           return null;
> >> >>> >
> >> >>> >         }
> >> >>> >
> >> >>> >         JsonArray word_vec = jo.get("word_vec").getAsJsonArray();
> >> >>> >
> >> >>> >         String url = jo.get("webpageUrl").getAsString();
> >> >>> >
> >> >>> >         double[] values = new double[word_vec.size()];
> >> >>> >
> >> >>> >         for (int i = 0; i < values.length; i++)
> >> >>> >
> >> >>> >           values[i] = word_vec.get(i).getAsInt();
> >> >>> >
> >> >>> >         return new VectorUrl(Vectors.dense(values), url);
> >> >>> >
> >> >>> >       }
> >> >>> >
> >> >>> >     });
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> >     // Index documents with unique IDs
> >> >>> >
> >> >>> >     JavaPairRDD<Long, VectorUrl> id2doc =
> >> >>> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
> >> >>> >
> >> >>> >         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long,
> >> >>> > VectorUrl>>()
> >> >>> > {
> >> >>> >
> >> >>> >           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl,
> >> >>> > Long>
> >> >>> > doc_id) {
> >> >>> >
> >> >>> >             return doc_id.swap();
> >> >>> >
> >> >>> >           }
> >> >>> >
> >> >>> >         }));
> >> >>> >
> >> >>> >     JavaPairRDD<Long, String> id2Url =
> >> >>> > JavaPairRDD.fromJavaRDD(id2doc
> >> >>> >
> >> >>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
> >> >>> > String>>() {
> >> >>> >
> >> >>> >           @Override
> >> >>> >
> >> >>> >           public Tuple2<Long, String> call(Tuple2<Long, VectorUrl>
> >> >>> > id2doc) throws Exception {
> >> >>> >
> >> >>> >             return new Tuple2(id2doc._1, id2doc._2.url);
> >> >>> >
> >> >>> >           }
> >> >>> >
> >> >>> >         }));
> >> >>> >
> >> >>> >     id2Url.saveAsTextFile(id2UrlPath);
> >> >>> >
> >> >>> >     JavaPairRDD<Long, Vector> corpus =
> >> >>> > JavaPairRDD.fromJavaRDD(id2doc
> >> >>> >
> >> >>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
> >> >>> > Vector>>() {
> >> >>> >
> >> >>> >           @Override
> >> >>> >
> >> >>> >           public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl>
> >> >>> > id2doc) throws Exception {
> >> >>> >
> >> >>> >             return new Tuple2(id2doc._1, id2doc._2.vec);
> >> >>> >
> >> >>> >           }
> >> >>> >
> >> >>> >         }));
> >> >>> >
> >> >>> >     corpus.cache();
> >> >>> >
> >> >>> >
> >> >>> >     // Cluster the documents into three topics using LDA
> >> >>> >
> >> >>> >     DistributedLDAModel ldaModel = (DistributedLDAModel) new
> >> >>> > LDA().setMaxIterations(iterNumber)
> >> >>> >
> >> >>> >         .setK(topicNumber).run(corpus);
> >> >>> >
> >> >>> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com>
> wrote:
> >> >>> >> I will use a portion of data and try. will the hdfs block affect
> >> >>> >> spark?(if so, it's hard to reproduce)
> >> >>> >>
> >> >>> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley
> >> >>> >> <jo...@databricks.com>
> >> >>> >> wrote:
> >> >>> >>> Hi Li,
> >> >>> >>>
> >> >>> >>> I'm wondering if you're running into the same bug reported here:
> >> >>> >>> https://issues.apache.org/jira/browse/SPARK-12488
> >> >>> >>>
> >> >>> >>> I haven't figured out yet what is causing it.  Do you have a
> small
> >> >>> >>> corpus
> >> >>> >>> which reproduces this error, and which you can share on the
> JIRA?
> >> >>> >>> If
> >> >>> >>> so,
> >> >>> >>> that would help a lot in debugging this failure.
> >> >>> >>>
> >> >>> >>> Thanks!
> >> >>> >>> Joseph
> >> >>> >>>
> >> >>> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com>
> >> >>> >>> wrote:
> >> >>> >>>>
> >> >>> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
> >> >>> >>>> it throws exception in line:   Matrix topics =
> >> >>> >>>> ldaModel.topicsMatrix();
> >> >>> >>>> But in yarn job history ui, it's successful. What's wrong with
> >> >>> >>>> it?
> >> >>> >>>> I submit job with
> >> >>> >>>> .bin/spark-submit --class Myclass \
> >> >>> >>>>     --master yarn-client \
> >> >>> >>>>     --num-executors 2 \
> >> >>> >>>>     --driver-memory 4g \
> >> >>> >>>>     --executor-memory 4g \
> >> >>> >>>>     --executor-cores 1 \
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>> My codes:
> >> >>> >>>>
> >> >>> >>>>    corpus.cache();
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>     // Cluster the documents into three topics using LDA
> >> >>> >>>>
> >> >>> >>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>     // Output topics. Each is a distribution over words
> (matching
> >> >>> >>>> word
> >> >>> >>>> count vectors)
> >> >>> >>>>
> >> >>> >>>>     System.out.println("Learned topics (as distributions over
> >> >>> >>>> vocab
> >> >>> >>>> of
> >> >>> >>>> " + ldaModel.vocabSize()
> >> >>> >>>>
> >> >>> >>>>         + " words):");
> >> >>> >>>>
> >> >>> >>>>    //Line81, exception here:    Matrix topics =
> >> >>> >>>> ldaModel.topicsMatrix();
> >> >>> >>>>
> >> >>> >>>>     for (int topic = 0; topic < topicNumber; topic++) {
> >> >>> >>>>
> >> >>> >>>>       System.out.print("Topic " + topic + ":");
> >> >>> >>>>
> >> >>> >>>>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
> >> >>> >>>>
> >> >>> >>>>         System.out.print(" " + topics.apply(word, topic));
> >> >>> >>>>
> >> >>> >>>>       }
> >> >>> >>>>
> >> >>> >>>>       System.out.println();
> >> >>> >>>>
> >> >>> >>>>     }
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>     ldaModel.save(sc.sc(), modelPath);
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
> >> >>> >>>> (1025,0) not in [-58,58) x [-100,100)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
> >> >>> >>>>
> >> >>> >>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> >> >>> >>>> Method)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >>> >>>>
> >> >>> >>>>         at java.lang.reflect.Method.invoke(Method.java:606)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> >> >>> >>>>
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>>
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> >> >>> >>>>
> >> >>> >>>>         at
> >> >>> >>>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> >> >>> >>>>
> >> >>> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from
> >> >>> >>>> shutdown
> >> >>> >>>> hook
> >> >>> >>>>
> >> >>> >>>>
> >> >>> >>>>
> ---------------------------------------------------------------------
> >> >>> >>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> >> >>> >>>> For additional commands, e-mail: dev-help@spark.apache.org
> >> >>> >>>>
> >> >>> >>>
> >> >>>
> >> >>>
> ---------------------------------------------------------------------
> >> >>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> >> >>> For additional commands, e-mail: dev-help@spark.apache.org
> >> >>>
> >> >>
> >
> >
>

Re: running lda in spark throws exception

Posted by Li Li <fa...@gmail.com>.
It looks like the problem is the vectors of term counts in the corpus
are not always the vocabulary size.
Do you mean some integers not occured in the corpus?
for example, I have the dictionary is 0 - 9 (total 10 words).
The docs are:
0 2 4 6 8
1 3 5 7 9
Then it will be correct
If the docs are:
0 2 4 6 9
1 3 5 6 7
8 is not occured in any document, Then it will wrong?

So the workaround is to process the input to re-encode terms?

On Thu, Jan 14, 2016 at 6:53 AM, Bryan Cutler <cu...@gmail.com> wrote:
> I was now able to reproduce the exception using the master branch and local
> mode.  It looks like the problem is the vectors of term counts in the corpus
> are not always the vocabulary size.  Once I padded these with zero counts to
> the vocab size, it ran without the exception.
>
> Joseph, I also tried calling describeTopics and noticed that with the
> improper vector size, it will not throw an exception but the term indices
> will start to be incorrect.  For a small number of iterations, it is ok, but
> increasing iterations causes the indices to get larger also.  Maybe that is
> what is going on in the JIRA you linked to?
>
> On Wed, Jan 13, 2016 at 1:17 AM, Li Li <fa...@gmail.com> wrote:
>>
>> I will try spark 1.6.0 to see it is the bug of 1.5.2.
>>
>> On Wed, Jan 13, 2016 at 3:58 PM, Li Li <fa...@gmail.com> wrote:
>> > I have set up a stand alone spark cluster and use the same codes. it
>> > still failed with the same exception
>> > I also preprocessed the data to lines of integers and use the scala
>> > codes of lda example. it still failed.
>> > the codes:
>> >
>> > import org.apache.spark.mllib.clustering.{ LDA, DistributedLDAModel }
>> >
>> > import org.apache.spark.mllib.linalg.Vectors
>> >
>> > import org.apache.spark.SparkContext
>> >
>> > import org.apache.spark.SparkContext._
>> >
>> > import org.apache.spark.SparkConf
>> >
>> >
>> > object TestLDA {
>> >
>> >   def main(args: Array[String]) {
>> >
>> >     if(args.length!=4){
>> >
>> >       println("need 4 args inDir outDir topic iternum")
>> >
>> >       System.exit(-1)
>> >
>> >     }
>> >
>> >     val conf = new SparkConf().setAppName("TestLDA")
>> >
>> >     val sc = new SparkContext(conf)
>> >
>> >     // Load and parse the data
>> >
>> >     val data = sc.textFile(args(0))
>> >
>> >     val parsedData = data.map(s => Vectors.dense(s.trim.split('
>> > ').map(_.toDouble)))
>> >
>> >     // Index documents with unique IDs
>> >
>> >     val corpus = parsedData.zipWithIndex.map(_.swap).cache()
>> >
>> >     val topicNum=Integer.valueOf(args(2))
>> >
>> >     val iterNum=Integer.valueOf(args(1))
>> >
>> >     // Cluster the documents into three topics using LDA
>> >
>> >     val ldaModel = new
>> > LDA().setK(topicNum).setMaxIterations(iterNum).run(corpus)
>> >
>> >
>> >     // Output topics. Each is a distribution over words (matching word
>> > count vectors)
>> >
>> >     println("Learned topics (as distributions over vocab of " +
>> > ldaModel.vocabSize + " words):")
>> >
>> >     val topics = ldaModel.topicsMatrix
>> >
>> >     for (topic <- Range(0, topicNum)) {
>> >
>> >       print("Topic " + topic + ":")
>> >
>> >       for (word <- Range(0, ldaModel.vocabSize)) { print(" " +
>> > topics(word, topic)); }
>> >
>> >       println()
>> >
>> >     }
>> >
>> >
>> >     // Save and load model.
>> >
>> >     ldaModel.save(sc, args(1))
>> >
>> >   }
>> >
>> >
>> > }
>> >
>> > scripts to submit:
>> >
>> > ~/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class
>> > com.mobvoi.knowledgegraph.scala_test.TestLDA \
>> >
>> >     --master spark://master:7077 \
>> >
>> >     --num-executors 10 \
>> >
>> >     --executor-memory 4g \
>> >
>> >     --executor-cores 3 \
>> >
>> >     scala_test-1.0-jar-with-dependencies.jar \
>> >
>> >     /test.txt \
>> >
>> >     100 \
>> >
>> >     5  \
>> >
>> >     /lda_model
>> >
>> > test.txt is in attachment
>> >
>> >
>> > On Sat, Jan 9, 2016 at 6:21 AM, Bryan Cutler <cu...@gmail.com> wrote:
>> >> Hi Li,
>> >>
>> >> I tried out your code and sample data in both local mode and Spark
>> >> Standalone and it ran correctly with output that looks good.  Sorry, I
>> >> don't
>> >> have a YARN cluster setup right now, so maybe the error you are seeing
>> >> is
>> >> specific to that.  Btw, I am running the latest Spark code from the
>> >> master
>> >> branch.  Hope that helps some!
>> >>
>> >> Bryan
>> >>
>> >> On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fa...@gmail.com> wrote:
>> >>>
>> >>> anyone could help? the problem is very easy to reproduce. What's
>> >>> wrong?
>> >>>
>> >>> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fa...@gmail.com> wrote:
>> >>> > I use a small data and reproduce the problem.
>> >>> > But I don't know my codes are correct or not because I am not
>> >>> > familiar
>> >>> > with spark.
>> >>> > So I first post my codes here. If it's correct, then I will post the
>> >>> > data.
>> >>> > one line of my data like:
>> >>> >
>> >>> > { "time":"08-09-17","cmtUrl":"2094361"
>> >>> >
>> >>> >
>> >>> > ,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
>> >>> >
>> >>> >
>> >>> > ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
>> >>> >
>> >>> > it's a json file which contains webpageUrl and word_vec which is the
>> >>> > encoded words.
>> >>> > The first step is to prase the input rdd to a rdd of VectorUrl.
>> >>> > BTW, if public VectorUrl call(String s) return null, is it ok?
>> >>> > Then follow the example Index documents with unique IDs
>> >>> > Then I create a rdd to map id to url so after lda training, I can
>> >>> > find
>> >>> > the url of the document. Then save this rdd to hdfs.
>> >>> > Then create corpus rdd and train
>> >>> >
>> >>> > The exception stack is
>> >>> >
>> >>> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
>> >>> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in
>> >>> > [-58,58) x [-100,100)
>> >>> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x
>> >>> > [-100,100)
>> >>> > at
>> >>> > breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>> >>> > at
>> >>> >
>> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>> >>> > at
>> >>> >
>> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>> >>> > at
>> >>> >
>> >>> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >>> > at
>> >>> > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> >>> > at
>> >>> >
>> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>> >>> > at
>> >>> >
>> >>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>> >>> > at
>> >>> >
>> >>> > com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
>> >>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>> > at
>> >>> >
>> >>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >>> > at
>> >>> >
>> >>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> >>> > at
>> >>> >
>> >>> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
>> >>> >
>> >>> >
>> >>> > ==========here is my codes==============
>> >>> >
>> >>> > SparkConf conf = new
>> >>> > SparkConf().setAppName(ReviewLDA.class.getName());
>> >>> >
>> >>> >     JavaSparkContext sc = new JavaSparkContext(conf);
>> >>> >
>> >>> >
>> >>> >     // Load and parse the data
>> >>> >
>> >>> >     JavaRDD<String> data = sc.textFile(inputDir + "/*");
>> >>> >
>> >>> >     JavaRDD<VectorUrl> parsedData = data.map(new Function<String,
>> >>> > VectorUrl>() {
>> >>> >
>> >>> >       public VectorUrl call(String s) {
>> >>> >
>> >>> >         JsonParser parser = new JsonParser();
>> >>> >
>> >>> >         JsonObject jo = parser.parse(s).getAsJsonObject();
>> >>> >
>> >>> >         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
>> >>> >
>> >>> >           return null;
>> >>> >
>> >>> >         }
>> >>> >
>> >>> >         JsonArray word_vec = jo.get("word_vec").getAsJsonArray();
>> >>> >
>> >>> >         String url = jo.get("webpageUrl").getAsString();
>> >>> >
>> >>> >         double[] values = new double[word_vec.size()];
>> >>> >
>> >>> >         for (int i = 0; i < values.length; i++)
>> >>> >
>> >>> >           values[i] = word_vec.get(i).getAsInt();
>> >>> >
>> >>> >         return new VectorUrl(Vectors.dense(values), url);
>> >>> >
>> >>> >       }
>> >>> >
>> >>> >     });
>> >>> >
>> >>> >
>> >>> >
>> >>> >     // Index documents with unique IDs
>> >>> >
>> >>> >     JavaPairRDD<Long, VectorUrl> id2doc =
>> >>> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
>> >>> >
>> >>> >         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long,
>> >>> > VectorUrl>>()
>> >>> > {
>> >>> >
>> >>> >           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl,
>> >>> > Long>
>> >>> > doc_id) {
>> >>> >
>> >>> >             return doc_id.swap();
>> >>> >
>> >>> >           }
>> >>> >
>> >>> >         }));
>> >>> >
>> >>> >     JavaPairRDD<Long, String> id2Url =
>> >>> > JavaPairRDD.fromJavaRDD(id2doc
>> >>> >
>> >>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>> >>> > String>>() {
>> >>> >
>> >>> >           @Override
>> >>> >
>> >>> >           public Tuple2<Long, String> call(Tuple2<Long, VectorUrl>
>> >>> > id2doc) throws Exception {
>> >>> >
>> >>> >             return new Tuple2(id2doc._1, id2doc._2.url);
>> >>> >
>> >>> >           }
>> >>> >
>> >>> >         }));
>> >>> >
>> >>> >     id2Url.saveAsTextFile(id2UrlPath);
>> >>> >
>> >>> >     JavaPairRDD<Long, Vector> corpus =
>> >>> > JavaPairRDD.fromJavaRDD(id2doc
>> >>> >
>> >>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>> >>> > Vector>>() {
>> >>> >
>> >>> >           @Override
>> >>> >
>> >>> >           public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl>
>> >>> > id2doc) throws Exception {
>> >>> >
>> >>> >             return new Tuple2(id2doc._1, id2doc._2.vec);
>> >>> >
>> >>> >           }
>> >>> >
>> >>> >         }));
>> >>> >
>> >>> >     corpus.cache();
>> >>> >
>> >>> >
>> >>> >     // Cluster the documents into three topics using LDA
>> >>> >
>> >>> >     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>> >>> > LDA().setMaxIterations(iterNumber)
>> >>> >
>> >>> >         .setK(topicNumber).run(corpus);
>> >>> >
>> >>> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com> wrote:
>> >>> >> I will use a portion of data and try. will the hdfs block affect
>> >>> >> spark?(if so, it's hard to reproduce)
>> >>> >>
>> >>> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley
>> >>> >> <jo...@databricks.com>
>> >>> >> wrote:
>> >>> >>> Hi Li,
>> >>> >>>
>> >>> >>> I'm wondering if you're running into the same bug reported here:
>> >>> >>> https://issues.apache.org/jira/browse/SPARK-12488
>> >>> >>>
>> >>> >>> I haven't figured out yet what is causing it.  Do you have a small
>> >>> >>> corpus
>> >>> >>> which reproduces this error, and which you can share on the JIRA?
>> >>> >>> If
>> >>> >>> so,
>> >>> >>> that would help a lot in debugging this failure.
>> >>> >>>
>> >>> >>> Thanks!
>> >>> >>> Joseph
>> >>> >>>
>> >>> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com>
>> >>> >>> wrote:
>> >>> >>>>
>> >>> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
>> >>> >>>> it throws exception in line:   Matrix topics =
>> >>> >>>> ldaModel.topicsMatrix();
>> >>> >>>> But in yarn job history ui, it's successful. What's wrong with
>> >>> >>>> it?
>> >>> >>>> I submit job with
>> >>> >>>> .bin/spark-submit --class Myclass \
>> >>> >>>>     --master yarn-client \
>> >>> >>>>     --num-executors 2 \
>> >>> >>>>     --driver-memory 4g \
>> >>> >>>>     --executor-memory 4g \
>> >>> >>>>     --executor-cores 1 \
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> My codes:
>> >>> >>>>
>> >>> >>>>    corpus.cache();
>> >>> >>>>
>> >>> >>>>
>> >>> >>>>     // Cluster the documents into three topics using LDA
>> >>> >>>>
>> >>> >>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>> >>> >>>>
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>> >>> >>>>
>> >>> >>>>
>> >>> >>>>     // Output topics. Each is a distribution over words (matching
>> >>> >>>> word
>> >>> >>>> count vectors)
>> >>> >>>>
>> >>> >>>>     System.out.println("Learned topics (as distributions over
>> >>> >>>> vocab
>> >>> >>>> of
>> >>> >>>> " + ldaModel.vocabSize()
>> >>> >>>>
>> >>> >>>>         + " words):");
>> >>> >>>>
>> >>> >>>>    //Line81, exception here:    Matrix topics =
>> >>> >>>> ldaModel.topicsMatrix();
>> >>> >>>>
>> >>> >>>>     for (int topic = 0; topic < topicNumber; topic++) {
>> >>> >>>>
>> >>> >>>>       System.out.print("Topic " + topic + ":");
>> >>> >>>>
>> >>> >>>>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
>> >>> >>>>
>> >>> >>>>         System.out.print(" " + topics.apply(word, topic));
>> >>> >>>>
>> >>> >>>>       }
>> >>> >>>>
>> >>> >>>>       System.out.println();
>> >>> >>>>
>> >>> >>>>     }
>> >>> >>>>
>> >>> >>>>
>> >>> >>>>     ldaModel.save(sc.sc(), modelPath);
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
>> >>> >>>> (1025,0) not in [-58,58) x [-100,100)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>> >>> >>>>
>> >>> >>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>> >>> >>>> Method)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>> >>>>
>> >>> >>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>>
>> >>> >>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>> >>> >>>>
>> >>> >>>>         at
>> >>> >>>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> >>> >>>>
>> >>> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from
>> >>> >>>> shutdown
>> >>> >>>> hook
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> ---------------------------------------------------------------------
>> >>> >>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> >>> >>>> For additional commands, e-mail: dev-help@spark.apache.org
>> >>> >>>>
>> >>> >>>
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> >>> For additional commands, e-mail: dev-help@spark.apache.org
>> >>>
>> >>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: running lda in spark throws exception

Posted by Bryan Cutler <cu...@gmail.com>.
I was now able to reproduce the exception using the master branch and local
mode.  It looks like the problem is the vectors of term counts in the
corpus are not always the vocabulary size.  Once I padded these with zero
counts to the vocab size, it ran without the exception.

Joseph, I also tried calling describeTopics and noticed that with the
improper vector size, it will not throw an exception but the term indices
will start to be incorrect.  For a small number of iterations, it is ok,
but increasing iterations causes the indices to get larger also.  Maybe
that is what is going on in the JIRA you linked to?

On Wed, Jan 13, 2016 at 1:17 AM, Li Li <fa...@gmail.com> wrote:

> I will try spark 1.6.0 to see it is the bug of 1.5.2.
>
> On Wed, Jan 13, 2016 at 3:58 PM, Li Li <fa...@gmail.com> wrote:
> > I have set up a stand alone spark cluster and use the same codes. it
> > still failed with the same exception
> > I also preprocessed the data to lines of integers and use the scala
> > codes of lda example. it still failed.
> > the codes:
> >
> > import org.apache.spark.mllib.clustering.{ LDA, DistributedLDAModel }
> >
> > import org.apache.spark.mllib.linalg.Vectors
> >
> > import org.apache.spark.SparkContext
> >
> > import org.apache.spark.SparkContext._
> >
> > import org.apache.spark.SparkConf
> >
> >
> > object TestLDA {
> >
> >   def main(args: Array[String]) {
> >
> >     if(args.length!=4){
> >
> >       println("need 4 args inDir outDir topic iternum")
> >
> >       System.exit(-1)
> >
> >     }
> >
> >     val conf = new SparkConf().setAppName("TestLDA")
> >
> >     val sc = new SparkContext(conf)
> >
> >     // Load and parse the data
> >
> >     val data = sc.textFile(args(0))
> >
> >     val parsedData = data.map(s => Vectors.dense(s.trim.split('
> > ').map(_.toDouble)))
> >
> >     // Index documents with unique IDs
> >
> >     val corpus = parsedData.zipWithIndex.map(_.swap).cache()
> >
> >     val topicNum=Integer.valueOf(args(2))
> >
> >     val iterNum=Integer.valueOf(args(1))
> >
> >     // Cluster the documents into three topics using LDA
> >
> >     val ldaModel = new
> > LDA().setK(topicNum).setMaxIterations(iterNum).run(corpus)
> >
> >
> >     // Output topics. Each is a distribution over words (matching word
> > count vectors)
> >
> >     println("Learned topics (as distributions over vocab of " +
> > ldaModel.vocabSize + " words):")
> >
> >     val topics = ldaModel.topicsMatrix
> >
> >     for (topic <- Range(0, topicNum)) {
> >
> >       print("Topic " + topic + ":")
> >
> >       for (word <- Range(0, ldaModel.vocabSize)) { print(" " +
> > topics(word, topic)); }
> >
> >       println()
> >
> >     }
> >
> >
> >     // Save and load model.
> >
> >     ldaModel.save(sc, args(1))
> >
> >   }
> >
> >
> > }
> >
> > scripts to submit:
> >
> > ~/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class
> > com.mobvoi.knowledgegraph.scala_test.TestLDA \
> >
> >     --master spark://master:7077 \
> >
> >     --num-executors 10 \
> >
> >     --executor-memory 4g \
> >
> >     --executor-cores 3 \
> >
> >     scala_test-1.0-jar-with-dependencies.jar \
> >
> >     /test.txt \
> >
> >     100 \
> >
> >     5  \
> >
> >     /lda_model
> >
> > test.txt is in attachment
> >
> >
> > On Sat, Jan 9, 2016 at 6:21 AM, Bryan Cutler <cu...@gmail.com> wrote:
> >> Hi Li,
> >>
> >> I tried out your code and sample data in both local mode and Spark
> >> Standalone and it ran correctly with output that looks good.  Sorry, I
> don't
> >> have a YARN cluster setup right now, so maybe the error you are seeing
> is
> >> specific to that.  Btw, I am running the latest Spark code from the
> master
> >> branch.  Hope that helps some!
> >>
> >> Bryan
> >>
> >> On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fa...@gmail.com> wrote:
> >>>
> >>> anyone could help? the problem is very easy to reproduce. What's wrong?
> >>>
> >>> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fa...@gmail.com> wrote:
> >>> > I use a small data and reproduce the problem.
> >>> > But I don't know my codes are correct or not because I am not
> familiar
> >>> > with spark.
> >>> > So I first post my codes here. If it's correct, then I will post the
> >>> > data.
> >>> > one line of my data like:
> >>> >
> >>> > { "time":"08-09-17","cmtUrl":"2094361"
> >>> >
> >>> > ,"rvId":"rev_10000020","webpageUrl":"
> http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
> >>> >
> >>> >
> ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
> >>> >
> >>> > it's a json file which contains webpageUrl and word_vec which is the
> >>> > encoded words.
> >>> > The first step is to prase the input rdd to a rdd of VectorUrl.
> >>> > BTW, if public VectorUrl call(String s) return null, is it ok?
> >>> > Then follow the example Index documents with unique IDs
> >>> > Then I create a rdd to map id to url so after lda training, I can
> find
> >>> > the url of the document. Then save this rdd to hdfs.
> >>> > Then create corpus rdd and train
> >>> >
> >>> > The exception stack is
> >>> >
> >>> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
> >>> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in
> >>> > [-58,58) x [-100,100)
> >>> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x
> >>> > [-100,100)
> >>> > at
> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
> >>> > at
> >>> >
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
> >>> > at
> >>> >
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
> >>> > at
> >>> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >>> > at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >>> > at
> >>> >
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
> >>> > at
> >>> >
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
> >>> > at
> >>> >
> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
> >>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>> > at
> >>> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >>> > at
> >>> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> > at java.lang.reflect.Method.invoke(Method.java:606)
> >>> > at
> >>> >
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
> >>> >
> >>> >
> >>> > ==========here is my codes==============
> >>> >
> >>> > SparkConf conf = new
> SparkConf().setAppName(ReviewLDA.class.getName());
> >>> >
> >>> >     JavaSparkContext sc = new JavaSparkContext(conf);
> >>> >
> >>> >
> >>> >     // Load and parse the data
> >>> >
> >>> >     JavaRDD<String> data = sc.textFile(inputDir + "/*");
> >>> >
> >>> >     JavaRDD<VectorUrl> parsedData = data.map(new Function<String,
> >>> > VectorUrl>() {
> >>> >
> >>> >       public VectorUrl call(String s) {
> >>> >
> >>> >         JsonParser parser = new JsonParser();
> >>> >
> >>> >         JsonObject jo = parser.parse(s).getAsJsonObject();
> >>> >
> >>> >         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
> >>> >
> >>> >           return null;
> >>> >
> >>> >         }
> >>> >
> >>> >         JsonArray word_vec = jo.get("word_vec").getAsJsonArray();
> >>> >
> >>> >         String url = jo.get("webpageUrl").getAsString();
> >>> >
> >>> >         double[] values = new double[word_vec.size()];
> >>> >
> >>> >         for (int i = 0; i < values.length; i++)
> >>> >
> >>> >           values[i] = word_vec.get(i).getAsInt();
> >>> >
> >>> >         return new VectorUrl(Vectors.dense(values), url);
> >>> >
> >>> >       }
> >>> >
> >>> >     });
> >>> >
> >>> >
> >>> >
> >>> >     // Index documents with unique IDs
> >>> >
> >>> >     JavaPairRDD<Long, VectorUrl> id2doc =
> >>> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
> >>> >
> >>> >         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long,
> VectorUrl>>()
> >>> > {
> >>> >
> >>> >           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl, Long>
> >>> > doc_id) {
> >>> >
> >>> >             return doc_id.swap();
> >>> >
> >>> >           }
> >>> >
> >>> >         }));
> >>> >
> >>> >     JavaPairRDD<Long, String> id2Url = JavaPairRDD.fromJavaRDD(id2doc
> >>> >
> >>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
> >>> > String>>() {
> >>> >
> >>> >           @Override
> >>> >
> >>> >           public Tuple2<Long, String> call(Tuple2<Long, VectorUrl>
> >>> > id2doc) throws Exception {
> >>> >
> >>> >             return new Tuple2(id2doc._1, id2doc._2.url);
> >>> >
> >>> >           }
> >>> >
> >>> >         }));
> >>> >
> >>> >     id2Url.saveAsTextFile(id2UrlPath);
> >>> >
> >>> >     JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(id2doc
> >>> >
> >>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
> >>> > Vector>>() {
> >>> >
> >>> >           @Override
> >>> >
> >>> >           public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl>
> >>> > id2doc) throws Exception {
> >>> >
> >>> >             return new Tuple2(id2doc._1, id2doc._2.vec);
> >>> >
> >>> >           }
> >>> >
> >>> >         }));
> >>> >
> >>> >     corpus.cache();
> >>> >
> >>> >
> >>> >     // Cluster the documents into three topics using LDA
> >>> >
> >>> >     DistributedLDAModel ldaModel = (DistributedLDAModel) new
> >>> > LDA().setMaxIterations(iterNumber)
> >>> >
> >>> >         .setK(topicNumber).run(corpus);
> >>> >
> >>> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com> wrote:
> >>> >> I will use a portion of data and try. will the hdfs block affect
> >>> >> spark?(if so, it's hard to reproduce)
> >>> >>
> >>> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <
> joseph@databricks.com>
> >>> >> wrote:
> >>> >>> Hi Li,
> >>> >>>
> >>> >>> I'm wondering if you're running into the same bug reported here:
> >>> >>> https://issues.apache.org/jira/browse/SPARK-12488
> >>> >>>
> >>> >>> I haven't figured out yet what is causing it.  Do you have a small
> >>> >>> corpus
> >>> >>> which reproduces this error, and which you can share on the JIRA?
> If
> >>> >>> so,
> >>> >>> that would help a lot in debugging this failure.
> >>> >>>
> >>> >>> Thanks!
> >>> >>> Joseph
> >>> >>>
> >>> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com>
> wrote:
> >>> >>>>
> >>> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
> >>> >>>> it throws exception in line:   Matrix topics =
> >>> >>>> ldaModel.topicsMatrix();
> >>> >>>> But in yarn job history ui, it's successful. What's wrong with it?
> >>> >>>> I submit job with
> >>> >>>> .bin/spark-submit --class Myclass \
> >>> >>>>     --master yarn-client \
> >>> >>>>     --num-executors 2 \
> >>> >>>>     --driver-memory 4g \
> >>> >>>>     --executor-memory 4g \
> >>> >>>>     --executor-cores 1 \
> >>> >>>>
> >>> >>>>
> >>> >>>> My codes:
> >>> >>>>
> >>> >>>>    corpus.cache();
> >>> >>>>
> >>> >>>>
> >>> >>>>     // Cluster the documents into three topics using LDA
> >>> >>>>
> >>> >>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
> >>> >>>>
> >>> >>>>
> >>> >>>>
> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
> >>> >>>>
> >>> >>>>
> >>> >>>>     // Output topics. Each is a distribution over words (matching
> >>> >>>> word
> >>> >>>> count vectors)
> >>> >>>>
> >>> >>>>     System.out.println("Learned topics (as distributions over
> vocab
> >>> >>>> of
> >>> >>>> " + ldaModel.vocabSize()
> >>> >>>>
> >>> >>>>         + " words):");
> >>> >>>>
> >>> >>>>    //Line81, exception here:    Matrix topics =
> >>> >>>> ldaModel.topicsMatrix();
> >>> >>>>
> >>> >>>>     for (int topic = 0; topic < topicNumber; topic++) {
> >>> >>>>
> >>> >>>>       System.out.print("Topic " + topic + ":");
> >>> >>>>
> >>> >>>>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
> >>> >>>>
> >>> >>>>         System.out.print(" " + topics.apply(word, topic));
> >>> >>>>
> >>> >>>>       }
> >>> >>>>
> >>> >>>>       System.out.println();
> >>> >>>>
> >>> >>>>     }
> >>> >>>>
> >>> >>>>
> >>> >>>>     ldaModel.save(sc.sc(), modelPath);
> >>> >>>>
> >>> >>>>
> >>> >>>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
> >>> >>>> (1025,0) not in [-58,58) x [-100,100)
> >>> >>>>
> >>> >>>>         at
> >>> >>>>
> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
> >>> >>>>
> >>> >>>>         at
> >>> >>>>
> >>> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
> >>> >>>>
> >>> >>>>         at
> >>> >>>>
> >>> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
> >>> >>>>
> >>> >>>>         at
> >>> >>>>
> >>> >>>>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >>> >>>>
> >>> >>>>         at
> >>> >>>>
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >>> >>>>
> >>> >>>>         at
> >>> >>>>
> >>> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
> >>> >>>>
> >>> >>>>         at
> >>> >>>>
> >>> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
> >>> >>>>
> >>> >>>>         at
> >>> >>>>
> >>> >>>>
> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
> >>> >>>>
> >>> >>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> >>> >>>> Method)
> >>> >>>>
> >>> >>>>         at
> >>> >>>>
> >>> >>>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >>> >>>>
> >>> >>>>         at
> >>> >>>>
> >>> >>>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> >>>>
> >>> >>>>         at java.lang.reflect.Method.invoke(Method.java:606)
> >>> >>>>
> >>> >>>>         at
> >>> >>>>
> >>> >>>>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
> >>> >>>>
> >>> >>>>         at
> >>> >>>>
> >>> >>>>
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> >>> >>>>
> >>> >>>>         at
> >>> >>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> >>> >>>>
> >>> >>>>         at
> >>> >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> >>> >>>>
> >>> >>>>         at
> >>> >>>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> >>> >>>>
> >>> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from
> >>> >>>> shutdown
> >>> >>>> hook
> >>> >>>>
> >>> >>>>
> ---------------------------------------------------------------------
> >>> >>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> >>> >>>> For additional commands, e-mail: dev-help@spark.apache.org
> >>> >>>>
> >>> >>>
> >>>
> >>> ---------------------------------------------------------------------
> >>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> >>> For additional commands, e-mail: dev-help@spark.apache.org
> >>>
> >>
>

Re: running lda in spark throws exception

Posted by Li Li <fa...@gmail.com>.
I will try spark 1.6.0 to see it is the bug of 1.5.2.

On Wed, Jan 13, 2016 at 3:58 PM, Li Li <fa...@gmail.com> wrote:
> I have set up a stand alone spark cluster and use the same codes. it
> still failed with the same exception
> I also preprocessed the data to lines of integers and use the scala
> codes of lda example. it still failed.
> the codes:
>
> import org.apache.spark.mllib.clustering.{ LDA, DistributedLDAModel }
>
> import org.apache.spark.mllib.linalg.Vectors
>
> import org.apache.spark.SparkContext
>
> import org.apache.spark.SparkContext._
>
> import org.apache.spark.SparkConf
>
>
> object TestLDA {
>
>   def main(args: Array[String]) {
>
>     if(args.length!=4){
>
>       println("need 4 args inDir outDir topic iternum")
>
>       System.exit(-1)
>
>     }
>
>     val conf = new SparkConf().setAppName("TestLDA")
>
>     val sc = new SparkContext(conf)
>
>     // Load and parse the data
>
>     val data = sc.textFile(args(0))
>
>     val parsedData = data.map(s => Vectors.dense(s.trim.split('
> ').map(_.toDouble)))
>
>     // Index documents with unique IDs
>
>     val corpus = parsedData.zipWithIndex.map(_.swap).cache()
>
>     val topicNum=Integer.valueOf(args(2))
>
>     val iterNum=Integer.valueOf(args(1))
>
>     // Cluster the documents into three topics using LDA
>
>     val ldaModel = new
> LDA().setK(topicNum).setMaxIterations(iterNum).run(corpus)
>
>
>     // Output topics. Each is a distribution over words (matching word
> count vectors)
>
>     println("Learned topics (as distributions over vocab of " +
> ldaModel.vocabSize + " words):")
>
>     val topics = ldaModel.topicsMatrix
>
>     for (topic <- Range(0, topicNum)) {
>
>       print("Topic " + topic + ":")
>
>       for (word <- Range(0, ldaModel.vocabSize)) { print(" " +
> topics(word, topic)); }
>
>       println()
>
>     }
>
>
>     // Save and load model.
>
>     ldaModel.save(sc, args(1))
>
>   }
>
>
> }
>
> scripts to submit:
>
> ~/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class
> com.mobvoi.knowledgegraph.scala_test.TestLDA \
>
>     --master spark://master:7077 \
>
>     --num-executors 10 \
>
>     --executor-memory 4g \
>
>     --executor-cores 3 \
>
>     scala_test-1.0-jar-with-dependencies.jar \
>
>     /test.txt \
>
>     100 \
>
>     5  \
>
>     /lda_model
>
> test.txt is in attachment
>
>
> On Sat, Jan 9, 2016 at 6:21 AM, Bryan Cutler <cu...@gmail.com> wrote:
>> Hi Li,
>>
>> I tried out your code and sample data in both local mode and Spark
>> Standalone and it ran correctly with output that looks good.  Sorry, I don't
>> have a YARN cluster setup right now, so maybe the error you are seeing is
>> specific to that.  Btw, I am running the latest Spark code from the master
>> branch.  Hope that helps some!
>>
>> Bryan
>>
>> On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fa...@gmail.com> wrote:
>>>
>>> anyone could help? the problem is very easy to reproduce. What's wrong?
>>>
>>> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fa...@gmail.com> wrote:
>>> > I use a small data and reproduce the problem.
>>> > But I don't know my codes are correct or not because I am not familiar
>>> > with spark.
>>> > So I first post my codes here. If it's correct, then I will post the
>>> > data.
>>> > one line of my data like:
>>> >
>>> > { "time":"08-09-17","cmtUrl":"2094361"
>>> >
>>> > ,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
>>> >
>>> > ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
>>> >
>>> > it's a json file which contains webpageUrl and word_vec which is the
>>> > encoded words.
>>> > The first step is to prase the input rdd to a rdd of VectorUrl.
>>> > BTW, if public VectorUrl call(String s) return null, is it ok?
>>> > Then follow the example Index documents with unique IDs
>>> > Then I create a rdd to map id to url so after lda training, I can find
>>> > the url of the document. Then save this rdd to hdfs.
>>> > Then create corpus rdd and train
>>> >
>>> > The exception stack is
>>> >
>>> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
>>> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in
>>> > [-58,58) x [-100,100)
>>> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x
>>> > [-100,100)
>>> > at breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>>> > at
>>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>>> > at
>>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>>> > at
>>> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>> > at
>>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>>> > at
>>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>>> > at
>>> > com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
>>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> > at
>>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> > at
>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> > at java.lang.reflect.Method.invoke(Method.java:606)
>>> > at
>>> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
>>> >
>>> >
>>> > ==========here is my codes==============
>>> >
>>> > SparkConf conf = new SparkConf().setAppName(ReviewLDA.class.getName());
>>> >
>>> >     JavaSparkContext sc = new JavaSparkContext(conf);
>>> >
>>> >
>>> >     // Load and parse the data
>>> >
>>> >     JavaRDD<String> data = sc.textFile(inputDir + "/*");
>>> >
>>> >     JavaRDD<VectorUrl> parsedData = data.map(new Function<String,
>>> > VectorUrl>() {
>>> >
>>> >       public VectorUrl call(String s) {
>>> >
>>> >         JsonParser parser = new JsonParser();
>>> >
>>> >         JsonObject jo = parser.parse(s).getAsJsonObject();
>>> >
>>> >         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
>>> >
>>> >           return null;
>>> >
>>> >         }
>>> >
>>> >         JsonArray word_vec = jo.get("word_vec").getAsJsonArray();
>>> >
>>> >         String url = jo.get("webpageUrl").getAsString();
>>> >
>>> >         double[] values = new double[word_vec.size()];
>>> >
>>> >         for (int i = 0; i < values.length; i++)
>>> >
>>> >           values[i] = word_vec.get(i).getAsInt();
>>> >
>>> >         return new VectorUrl(Vectors.dense(values), url);
>>> >
>>> >       }
>>> >
>>> >     });
>>> >
>>> >
>>> >
>>> >     // Index documents with unique IDs
>>> >
>>> >     JavaPairRDD<Long, VectorUrl> id2doc =
>>> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
>>> >
>>> >         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long, VectorUrl>>()
>>> > {
>>> >
>>> >           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl, Long>
>>> > doc_id) {
>>> >
>>> >             return doc_id.swap();
>>> >
>>> >           }
>>> >
>>> >         }));
>>> >
>>> >     JavaPairRDD<Long, String> id2Url = JavaPairRDD.fromJavaRDD(id2doc
>>> >
>>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>>> > String>>() {
>>> >
>>> >           @Override
>>> >
>>> >           public Tuple2<Long, String> call(Tuple2<Long, VectorUrl>
>>> > id2doc) throws Exception {
>>> >
>>> >             return new Tuple2(id2doc._1, id2doc._2.url);
>>> >
>>> >           }
>>> >
>>> >         }));
>>> >
>>> >     id2Url.saveAsTextFile(id2UrlPath);
>>> >
>>> >     JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(id2doc
>>> >
>>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>>> > Vector>>() {
>>> >
>>> >           @Override
>>> >
>>> >           public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl>
>>> > id2doc) throws Exception {
>>> >
>>> >             return new Tuple2(id2doc._1, id2doc._2.vec);
>>> >
>>> >           }
>>> >
>>> >         }));
>>> >
>>> >     corpus.cache();
>>> >
>>> >
>>> >     // Cluster the documents into three topics using LDA
>>> >
>>> >     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>>> > LDA().setMaxIterations(iterNumber)
>>> >
>>> >         .setK(topicNumber).run(corpus);
>>> >
>>> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com> wrote:
>>> >> I will use a portion of data and try. will the hdfs block affect
>>> >> spark?(if so, it's hard to reproduce)
>>> >>
>>> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <jo...@databricks.com>
>>> >> wrote:
>>> >>> Hi Li,
>>> >>>
>>> >>> I'm wondering if you're running into the same bug reported here:
>>> >>> https://issues.apache.org/jira/browse/SPARK-12488
>>> >>>
>>> >>> I haven't figured out yet what is causing it.  Do you have a small
>>> >>> corpus
>>> >>> which reproduces this error, and which you can share on the JIRA?  If
>>> >>> so,
>>> >>> that would help a lot in debugging this failure.
>>> >>>
>>> >>> Thanks!
>>> >>> Joseph
>>> >>>
>>> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com> wrote:
>>> >>>>
>>> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
>>> >>>> it throws exception in line:   Matrix topics =
>>> >>>> ldaModel.topicsMatrix();
>>> >>>> But in yarn job history ui, it's successful. What's wrong with it?
>>> >>>> I submit job with
>>> >>>> .bin/spark-submit --class Myclass \
>>> >>>>     --master yarn-client \
>>> >>>>     --num-executors 2 \
>>> >>>>     --driver-memory 4g \
>>> >>>>     --executor-memory 4g \
>>> >>>>     --executor-cores 1 \
>>> >>>>
>>> >>>>
>>> >>>> My codes:
>>> >>>>
>>> >>>>    corpus.cache();
>>> >>>>
>>> >>>>
>>> >>>>     // Cluster the documents into three topics using LDA
>>> >>>>
>>> >>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>>> >>>>
>>> >>>>
>>> >>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>>> >>>>
>>> >>>>
>>> >>>>     // Output topics. Each is a distribution over words (matching
>>> >>>> word
>>> >>>> count vectors)
>>> >>>>
>>> >>>>     System.out.println("Learned topics (as distributions over vocab
>>> >>>> of
>>> >>>> " + ldaModel.vocabSize()
>>> >>>>
>>> >>>>         + " words):");
>>> >>>>
>>> >>>>    //Line81, exception here:    Matrix topics =
>>> >>>> ldaModel.topicsMatrix();
>>> >>>>
>>> >>>>     for (int topic = 0; topic < topicNumber; topic++) {
>>> >>>>
>>> >>>>       System.out.print("Topic " + topic + ":");
>>> >>>>
>>> >>>>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
>>> >>>>
>>> >>>>         System.out.print(" " + topics.apply(word, topic));
>>> >>>>
>>> >>>>       }
>>> >>>>
>>> >>>>       System.out.println();
>>> >>>>
>>> >>>>     }
>>> >>>>
>>> >>>>
>>> >>>>     ldaModel.save(sc.sc(), modelPath);
>>> >>>>
>>> >>>>
>>> >>>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
>>> >>>> (1025,0) not in [-58,58) x [-100,100)
>>> >>>>
>>> >>>>         at
>>> >>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> >>>>
>>> >>>>         at
>>> >>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>>> >>>>
>>> >>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> >>>> Method)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> >>>>
>>> >>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>> >>>>
>>> >>>>         at
>>> >>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>> >>>>
>>> >>>>         at
>>> >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>>> >>>>
>>> >>>>         at
>>> >>>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> >>>>
>>> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from
>>> >>>> shutdown
>>> >>>> hook
>>> >>>>
>>> >>>> ---------------------------------------------------------------------
>>> >>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>> >>>> For additional commands, e-mail: dev-help@spark.apache.org
>>> >>>>
>>> >>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: dev-help@spark.apache.org
>>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: running lda in spark throws exception

Posted by Li Li <fa...@gmail.com>.
I have set up a stand alone spark cluster and use the same codes. it
still failed with the same exception
I also preprocessed the data to lines of integers and use the scala
codes of lda example. it still failed.
the codes:

import org.apache.spark.mllib.clustering.{ LDA, DistributedLDAModel }

import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf


object TestLDA {

  def main(args: Array[String]) {

    if(args.length!=4){

      println("need 4 args inDir outDir topic iternum")

      System.exit(-1)

    }

    val conf = new SparkConf().setAppName("TestLDA")

    val sc = new SparkContext(conf)

    // Load and parse the data

    val data = sc.textFile(args(0))

    val parsedData = data.map(s => Vectors.dense(s.trim.split('
').map(_.toDouble)))

    // Index documents with unique IDs

    val corpus = parsedData.zipWithIndex.map(_.swap).cache()

    val topicNum=Integer.valueOf(args(2))

    val iterNum=Integer.valueOf(args(1))

    // Cluster the documents into three topics using LDA

    val ldaModel = new
LDA().setK(topicNum).setMaxIterations(iterNum).run(corpus)


    // Output topics. Each is a distribution over words (matching word
count vectors)

    println("Learned topics (as distributions over vocab of " +
ldaModel.vocabSize + " words):")

    val topics = ldaModel.topicsMatrix

    for (topic <- Range(0, topicNum)) {

      print("Topic " + topic + ":")

      for (word <- Range(0, ldaModel.vocabSize)) { print(" " +
topics(word, topic)); }

      println()

    }


    // Save and load model.

    ldaModel.save(sc, args(1))

  }


}

scripts to submit:

~/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class
com.mobvoi.knowledgegraph.scala_test.TestLDA \

    --master spark://master:7077 \

    --num-executors 10 \

    --executor-memory 4g \

    --executor-cores 3 \

    scala_test-1.0-jar-with-dependencies.jar \

    /test.txt \

    100 \

    5  \

    /lda_model

test.txt is in attachment


On Sat, Jan 9, 2016 at 6:21 AM, Bryan Cutler <cu...@gmail.com> wrote:
> Hi Li,
>
> I tried out your code and sample data in both local mode and Spark
> Standalone and it ran correctly with output that looks good.  Sorry, I don't
> have a YARN cluster setup right now, so maybe the error you are seeing is
> specific to that.  Btw, I am running the latest Spark code from the master
> branch.  Hope that helps some!
>
> Bryan
>
> On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fa...@gmail.com> wrote:
>>
>> anyone could help? the problem is very easy to reproduce. What's wrong?
>>
>> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fa...@gmail.com> wrote:
>> > I use a small data and reproduce the problem.
>> > But I don't know my codes are correct or not because I am not familiar
>> > with spark.
>> > So I first post my codes here. If it's correct, then I will post the
>> > data.
>> > one line of my data like:
>> >
>> > { "time":"08-09-17","cmtUrl":"2094361"
>> >
>> > ,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
>> >
>> > ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
>> >
>> > it's a json file which contains webpageUrl and word_vec which is the
>> > encoded words.
>> > The first step is to prase the input rdd to a rdd of VectorUrl.
>> > BTW, if public VectorUrl call(String s) return null, is it ok?
>> > Then follow the example Index documents with unique IDs
>> > Then I create a rdd to map id to url so after lda training, I can find
>> > the url of the document. Then save this rdd to hdfs.
>> > Then create corpus rdd and train
>> >
>> > The exception stack is
>> >
>> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
>> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in
>> > [-58,58) x [-100,100)
>> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x
>> > [-100,100)
>> > at breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>> > at
>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>> > at
>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>> > at
>> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> > at
>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>> > at
>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>> > at
>> > com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at
>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> > at
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> > at
>> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
>> >
>> >
>> > ==========here is my codes==============
>> >
>> > SparkConf conf = new SparkConf().setAppName(ReviewLDA.class.getName());
>> >
>> >     JavaSparkContext sc = new JavaSparkContext(conf);
>> >
>> >
>> >     // Load and parse the data
>> >
>> >     JavaRDD<String> data = sc.textFile(inputDir + "/*");
>> >
>> >     JavaRDD<VectorUrl> parsedData = data.map(new Function<String,
>> > VectorUrl>() {
>> >
>> >       public VectorUrl call(String s) {
>> >
>> >         JsonParser parser = new JsonParser();
>> >
>> >         JsonObject jo = parser.parse(s).getAsJsonObject();
>> >
>> >         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
>> >
>> >           return null;
>> >
>> >         }
>> >
>> >         JsonArray word_vec = jo.get("word_vec").getAsJsonArray();
>> >
>> >         String url = jo.get("webpageUrl").getAsString();
>> >
>> >         double[] values = new double[word_vec.size()];
>> >
>> >         for (int i = 0; i < values.length; i++)
>> >
>> >           values[i] = word_vec.get(i).getAsInt();
>> >
>> >         return new VectorUrl(Vectors.dense(values), url);
>> >
>> >       }
>> >
>> >     });
>> >
>> >
>> >
>> >     // Index documents with unique IDs
>> >
>> >     JavaPairRDD<Long, VectorUrl> id2doc =
>> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
>> >
>> >         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long, VectorUrl>>()
>> > {
>> >
>> >           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl, Long>
>> > doc_id) {
>> >
>> >             return doc_id.swap();
>> >
>> >           }
>> >
>> >         }));
>> >
>> >     JavaPairRDD<Long, String> id2Url = JavaPairRDD.fromJavaRDD(id2doc
>> >
>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>> > String>>() {
>> >
>> >           @Override
>> >
>> >           public Tuple2<Long, String> call(Tuple2<Long, VectorUrl>
>> > id2doc) throws Exception {
>> >
>> >             return new Tuple2(id2doc._1, id2doc._2.url);
>> >
>> >           }
>> >
>> >         }));
>> >
>> >     id2Url.saveAsTextFile(id2UrlPath);
>> >
>> >     JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(id2doc
>> >
>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>> > Vector>>() {
>> >
>> >           @Override
>> >
>> >           public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl>
>> > id2doc) throws Exception {
>> >
>> >             return new Tuple2(id2doc._1, id2doc._2.vec);
>> >
>> >           }
>> >
>> >         }));
>> >
>> >     corpus.cache();
>> >
>> >
>> >     // Cluster the documents into three topics using LDA
>> >
>> >     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>> > LDA().setMaxIterations(iterNumber)
>> >
>> >         .setK(topicNumber).run(corpus);
>> >
>> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com> wrote:
>> >> I will use a portion of data and try. will the hdfs block affect
>> >> spark?(if so, it's hard to reproduce)
>> >>
>> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <jo...@databricks.com>
>> >> wrote:
>> >>> Hi Li,
>> >>>
>> >>> I'm wondering if you're running into the same bug reported here:
>> >>> https://issues.apache.org/jira/browse/SPARK-12488
>> >>>
>> >>> I haven't figured out yet what is causing it.  Do you have a small
>> >>> corpus
>> >>> which reproduces this error, and which you can share on the JIRA?  If
>> >>> so,
>> >>> that would help a lot in debugging this failure.
>> >>>
>> >>> Thanks!
>> >>> Joseph
>> >>>
>> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com> wrote:
>> >>>>
>> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
>> >>>> it throws exception in line:   Matrix topics =
>> >>>> ldaModel.topicsMatrix();
>> >>>> But in yarn job history ui, it's successful. What's wrong with it?
>> >>>> I submit job with
>> >>>> .bin/spark-submit --class Myclass \
>> >>>>     --master yarn-client \
>> >>>>     --num-executors 2 \
>> >>>>     --driver-memory 4g \
>> >>>>     --executor-memory 4g \
>> >>>>     --executor-cores 1 \
>> >>>>
>> >>>>
>> >>>> My codes:
>> >>>>
>> >>>>    corpus.cache();
>> >>>>
>> >>>>
>> >>>>     // Cluster the documents into three topics using LDA
>> >>>>
>> >>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>> >>>>
>> >>>>
>> >>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>> >>>>
>> >>>>
>> >>>>     // Output topics. Each is a distribution over words (matching
>> >>>> word
>> >>>> count vectors)
>> >>>>
>> >>>>     System.out.println("Learned topics (as distributions over vocab
>> >>>> of
>> >>>> " + ldaModel.vocabSize()
>> >>>>
>> >>>>         + " words):");
>> >>>>
>> >>>>    //Line81, exception here:    Matrix topics =
>> >>>> ldaModel.topicsMatrix();
>> >>>>
>> >>>>     for (int topic = 0; topic < topicNumber; topic++) {
>> >>>>
>> >>>>       System.out.print("Topic " + topic + ":");
>> >>>>
>> >>>>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
>> >>>>
>> >>>>         System.out.print(" " + topics.apply(word, topic));
>> >>>>
>> >>>>       }
>> >>>>
>> >>>>       System.out.println();
>> >>>>
>> >>>>     }
>> >>>>
>> >>>>
>> >>>>     ldaModel.save(sc.sc(), modelPath);
>> >>>>
>> >>>>
>> >>>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
>> >>>> (1025,0) not in [-58,58) x [-100,100)
>> >>>>
>> >>>>         at
>> >>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >>>>
>> >>>>         at
>> >>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>> >>>>
>> >>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>> >>>> Method)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>
>> >>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>> >>>>
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> >>>>
>> >>>>         at
>> >>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> >>>>
>> >>>>         at
>> >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>> >>>>
>> >>>>         at
>> >>>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> >>>>
>> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from
>> >>>> shutdown
>> >>>> hook
>> >>>>
>> >>>> ---------------------------------------------------------------------
>> >>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> >>>> For additional commands, e-mail: dev-help@spark.apache.org
>> >>>>
>> >>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> For additional commands, e-mail: dev-help@spark.apache.org
>>
>

Re: running lda in spark throws exception

Posted by Bryan Cutler <cu...@gmail.com>.
Hi Li,

I tried out your code and sample data in both local mode and Spark
Standalone and it ran correctly with output that looks good.  Sorry, I
don't have a YARN cluster setup right now, so maybe the error you are
seeing is specific to that.  Btw, I am running the latest Spark code from
the master branch.  Hope that helps some!

Bryan

On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fa...@gmail.com> wrote:

> anyone could help? the problem is very easy to reproduce. What's wrong?
>
> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fa...@gmail.com> wrote:
> > I use a small data and reproduce the problem.
> > But I don't know my codes are correct or not because I am not familiar
> > with spark.
> > So I first post my codes here. If it's correct, then I will post the
> data.
> > one line of my data like:
> >
> > { "time":"08-09-17","cmtUrl":"2094361"
> > ,"rvId":"rev_10000020","webpageUrl":"
> http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
> >
>  ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
> >
> > it's a json file which contains webpageUrl and word_vec which is the
> > encoded words.
> > The first step is to prase the input rdd to a rdd of VectorUrl.
> > BTW, if public VectorUrl call(String s) return null, is it ok?
> > Then follow the example Index documents with unique IDs
> > Then I create a rdd to map id to url so after lda training, I can find
> > the url of the document. Then save this rdd to hdfs.
> > Then create corpus rdd and train
> >
> > The exception stack is
> >
> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in
> > [-58,58) x [-100,100)
> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x [-100,100)
> > at breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
> > at
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
> > at
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
> > at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> > at
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
> > at
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
> > at
> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:606)
> > at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
> >
> >
> > ==========here is my codes==============
> >
> > SparkConf conf = new SparkConf().setAppName(ReviewLDA.class.getName());
> >
> >     JavaSparkContext sc = new JavaSparkContext(conf);
> >
> >
> >     // Load and parse the data
> >
> >     JavaRDD<String> data = sc.textFile(inputDir + "/*");
> >
> >     JavaRDD<VectorUrl> parsedData = data.map(new Function<String,
> VectorUrl>() {
> >
> >       public VectorUrl call(String s) {
> >
> >         JsonParser parser = new JsonParser();
> >
> >         JsonObject jo = parser.parse(s).getAsJsonObject();
> >
> >         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
> >
> >           return null;
> >
> >         }
> >
> >         JsonArray word_vec = jo.get("word_vec").getAsJsonArray();
> >
> >         String url = jo.get("webpageUrl").getAsString();
> >
> >         double[] values = new double[word_vec.size()];
> >
> >         for (int i = 0; i < values.length; i++)
> >
> >           values[i] = word_vec.get(i).getAsInt();
> >
> >         return new VectorUrl(Vectors.dense(values), url);
> >
> >       }
> >
> >     });
> >
> >
> >
> >     // Index documents with unique IDs
> >
> >     JavaPairRDD<Long, VectorUrl> id2doc =
> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
> >
> >         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long, VectorUrl>>()
> {
> >
> >           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl, Long>
> doc_id) {
> >
> >             return doc_id.swap();
> >
> >           }
> >
> >         }));
> >
> >     JavaPairRDD<Long, String> id2Url = JavaPairRDD.fromJavaRDD(id2doc
> >
> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
> String>>() {
> >
> >           @Override
> >
> >           public Tuple2<Long, String> call(Tuple2<Long, VectorUrl>
> > id2doc) throws Exception {
> >
> >             return new Tuple2(id2doc._1, id2doc._2.url);
> >
> >           }
> >
> >         }));
> >
> >     id2Url.saveAsTextFile(id2UrlPath);
> >
> >     JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(id2doc
> >
> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
> Vector>>() {
> >
> >           @Override
> >
> >           public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl>
> > id2doc) throws Exception {
> >
> >             return new Tuple2(id2doc._1, id2doc._2.vec);
> >
> >           }
> >
> >         }));
> >
> >     corpus.cache();
> >
> >
> >     // Cluster the documents into three topics using LDA
> >
> >     DistributedLDAModel ldaModel = (DistributedLDAModel) new
> > LDA().setMaxIterations(iterNumber)
> >
> >         .setK(topicNumber).run(corpus);
> >
> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com> wrote:
> >> I will use a portion of data and try. will the hdfs block affect
> >> spark?(if so, it's hard to reproduce)
> >>
> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <jo...@databricks.com>
> wrote:
> >>> Hi Li,
> >>>
> >>> I'm wondering if you're running into the same bug reported here:
> >>> https://issues.apache.org/jira/browse/SPARK-12488
> >>>
> >>> I haven't figured out yet what is causing it.  Do you have a small
> corpus
> >>> which reproduces this error, and which you can share on the JIRA?  If
> so,
> >>> that would help a lot in debugging this failure.
> >>>
> >>> Thanks!
> >>> Joseph
> >>>
> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com> wrote:
> >>>>
> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
> >>>> it throws exception in line:   Matrix topics =
> ldaModel.topicsMatrix();
> >>>> But in yarn job history ui, it's successful. What's wrong with it?
> >>>> I submit job with
> >>>> .bin/spark-submit --class Myclass \
> >>>>     --master yarn-client \
> >>>>     --num-executors 2 \
> >>>>     --driver-memory 4g \
> >>>>     --executor-memory 4g \
> >>>>     --executor-cores 1 \
> >>>>
> >>>>
> >>>> My codes:
> >>>>
> >>>>    corpus.cache();
> >>>>
> >>>>
> >>>>     // Cluster the documents into three topics using LDA
> >>>>
> >>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
> >>>>
> >>>>
> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
> >>>>
> >>>>
> >>>>     // Output topics. Each is a distribution over words (matching word
> >>>> count vectors)
> >>>>
> >>>>     System.out.println("Learned topics (as distributions over vocab of
> >>>> " + ldaModel.vocabSize()
> >>>>
> >>>>         + " words):");
> >>>>
> >>>>    //Line81, exception here:    Matrix topics =
> ldaModel.topicsMatrix();
> >>>>
> >>>>     for (int topic = 0; topic < topicNumber; topic++) {
> >>>>
> >>>>       System.out.print("Topic " + topic + ":");
> >>>>
> >>>>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
> >>>>
> >>>>         System.out.print(" " + topics.apply(word, topic));
> >>>>
> >>>>       }
> >>>>
> >>>>       System.out.println();
> >>>>
> >>>>     }
> >>>>
> >>>>
> >>>>     ldaModel.save(sc.sc(), modelPath);
> >>>>
> >>>>
> >>>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
> >>>> (1025,0) not in [-58,58) x [-100,100)
> >>>>
> >>>>         at
> >>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
> >>>>
> >>>>         at
> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
> >>>>
> >>>>         at
> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
> >>>>
> >>>>         at
> >>>>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >>>>
> >>>>         at
> >>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >>>>
> >>>>         at
> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
> >>>>
> >>>>         at
> >>>>
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
> >>>>
> >>>>         at
> >>>>
> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
> >>>>
> >>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>>
> >>>>         at
> >>>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >>>>
> >>>>         at
> >>>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>>>
> >>>>         at java.lang.reflect.Method.invoke(Method.java:606)
> >>>>
> >>>>         at
> >>>>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
> >>>>
> >>>>         at
> >>>>
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> >>>>
> >>>>         at
> >>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> >>>>
> >>>>         at
> >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> >>>>
> >>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> >>>>
> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from
> shutdown
> >>>> hook
> >>>>
> >>>> ---------------------------------------------------------------------
> >>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> >>>> For additional commands, e-mail: dev-help@spark.apache.org
> >>>>
> >>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>

Re: running lda in spark throws exception

Posted by Li Li <fa...@gmail.com>.
anyone could help? the problem is very easy to reproduce. What's wrong?

On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fa...@gmail.com> wrote:
> I use a small data and reproduce the problem.
> But I don't know my codes are correct or not because I am not familiar
> with spark.
> So I first post my codes here. If it's correct, then I will post the data.
> one line of my data like:
>
> { "time":"08-09-17","cmtUrl":"2094361"
> ,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
>     ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
>
> it's a json file which contains webpageUrl and word_vec which is the
> encoded words.
> The first step is to prase the input rdd to a rdd of VectorUrl.
> BTW, if public VectorUrl call(String s) return null, is it ok?
> Then follow the example Index documents with unique IDs
> Then I create a rdd to map id to url so after lda training, I can find
> the url of the document. Then save this rdd to hdfs.
> Then create corpus rdd and train
>
> The exception stack is
>
> 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
> exception: java.lang.IndexOutOfBoundsException: (454,0) not in
> [-58,58) x [-100,100)
> java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x [-100,100)
> at breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
> at org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
> at org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
> at org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
> at com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
>
>
> ==========here is my codes==============
>
> SparkConf conf = new SparkConf().setAppName(ReviewLDA.class.getName());
>
>     JavaSparkContext sc = new JavaSparkContext(conf);
>
>
>     // Load and parse the data
>
>     JavaRDD<String> data = sc.textFile(inputDir + "/*");
>
>     JavaRDD<VectorUrl> parsedData = data.map(new Function<String, VectorUrl>() {
>
>       public VectorUrl call(String s) {
>
>         JsonParser parser = new JsonParser();
>
>         JsonObject jo = parser.parse(s).getAsJsonObject();
>
>         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
>
>           return null;
>
>         }
>
>         JsonArray word_vec = jo.get("word_vec").getAsJsonArray();
>
>         String url = jo.get("webpageUrl").getAsString();
>
>         double[] values = new double[word_vec.size()];
>
>         for (int i = 0; i < values.length; i++)
>
>           values[i] = word_vec.get(i).getAsInt();
>
>         return new VectorUrl(Vectors.dense(values), url);
>
>       }
>
>     });
>
>
>
>     // Index documents with unique IDs
>
>     JavaPairRDD<Long, VectorUrl> id2doc =
> JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
>
>         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long, VectorUrl>>() {
>
>           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl, Long> doc_id) {
>
>             return doc_id.swap();
>
>           }
>
>         }));
>
>     JavaPairRDD<Long, String> id2Url = JavaPairRDD.fromJavaRDD(id2doc
>
>         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, String>>() {
>
>           @Override
>
>           public Tuple2<Long, String> call(Tuple2<Long, VectorUrl>
> id2doc) throws Exception {
>
>             return new Tuple2(id2doc._1, id2doc._2.url);
>
>           }
>
>         }));
>
>     id2Url.saveAsTextFile(id2UrlPath);
>
>     JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(id2doc
>
>         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, Vector>>() {
>
>           @Override
>
>           public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl>
> id2doc) throws Exception {
>
>             return new Tuple2(id2doc._1, id2doc._2.vec);
>
>           }
>
>         }));
>
>     corpus.cache();
>
>
>     // Cluster the documents into three topics using LDA
>
>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
> LDA().setMaxIterations(iterNumber)
>
>         .setK(topicNumber).run(corpus);
>
> On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com> wrote:
>> I will use a portion of data and try. will the hdfs block affect
>> spark?(if so, it's hard to reproduce)
>>
>> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <jo...@databricks.com> wrote:
>>> Hi Li,
>>>
>>> I'm wondering if you're running into the same bug reported here:
>>> https://issues.apache.org/jira/browse/SPARK-12488
>>>
>>> I haven't figured out yet what is causing it.  Do you have a small corpus
>>> which reproduces this error, and which you can share on the JIRA?  If so,
>>> that would help a lot in debugging this failure.
>>>
>>> Thanks!
>>> Joseph
>>>
>>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com> wrote:
>>>>
>>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
>>>> it throws exception in line:   Matrix topics = ldaModel.topicsMatrix();
>>>> But in yarn job history ui, it's successful. What's wrong with it?
>>>> I submit job with
>>>> .bin/spark-submit --class Myclass \
>>>>     --master yarn-client \
>>>>     --num-executors 2 \
>>>>     --driver-memory 4g \
>>>>     --executor-memory 4g \
>>>>     --executor-cores 1 \
>>>>
>>>>
>>>> My codes:
>>>>
>>>>    corpus.cache();
>>>>
>>>>
>>>>     // Cluster the documents into three topics using LDA
>>>>
>>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>>>>
>>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>>>>
>>>>
>>>>     // Output topics. Each is a distribution over words (matching word
>>>> count vectors)
>>>>
>>>>     System.out.println("Learned topics (as distributions over vocab of
>>>> " + ldaModel.vocabSize()
>>>>
>>>>         + " words):");
>>>>
>>>>    //Line81, exception here:    Matrix topics = ldaModel.topicsMatrix();
>>>>
>>>>     for (int topic = 0; topic < topicNumber; topic++) {
>>>>
>>>>       System.out.print("Topic " + topic + ":");
>>>>
>>>>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
>>>>
>>>>         System.out.print(" " + topics.apply(word, topic));
>>>>
>>>>       }
>>>>
>>>>       System.out.println();
>>>>
>>>>     }
>>>>
>>>>
>>>>     ldaModel.save(sc.sc(), modelPath);
>>>>
>>>>
>>>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
>>>> (1025,0) not in [-58,58) x [-100,100)
>>>>
>>>>         at
>>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>>>>
>>>>         at
>>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>>>>
>>>>         at
>>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>>>>
>>>>         at
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>
>>>>         at
>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>
>>>>         at
>>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>>>>
>>>>         at
>>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>>>>
>>>>         at
>>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>>>>
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>
>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>>>>
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>>>
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>>
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>>>>
>>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>
>>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from shutdown
>>>> hook
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: dev-help@spark.apache.org
>>>>
>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: running lda in spark throws exception

Posted by Li Li <fa...@gmail.com>.
I use a small data and reproduce the problem.
But I don't know my codes are correct or not because I am not familiar
with spark.
So I first post my codes here. If it's correct, then I will post the data.
one line of my data like:

{ "time":"08-09-17","cmtUrl":"2094361"
,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
    ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}

it's a json file which contains webpageUrl and word_vec which is the
encoded words.
The first step is to prase the input rdd to a rdd of VectorUrl.
BTW, if public VectorUrl call(String s) return null, is it ok?
Then follow the example Index documents with unique IDs
Then I create a rdd to map id to url so after lda training, I can find
the url of the document. Then save this rdd to hdfs.
Then create corpus rdd and train

The exception stack is

15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
exception: java.lang.IndexOutOfBoundsException: (454,0) not in
[-58,58) x [-100,100)
java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x [-100,100)
at breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
at org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
at org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
at org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
at com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)


==========here is my codes==============

SparkConf conf = new SparkConf().setAppName(ReviewLDA.class.getName());

    JavaSparkContext sc = new JavaSparkContext(conf);


    // Load and parse the data

    JavaRDD<String> data = sc.textFile(inputDir + "/*");

    JavaRDD<VectorUrl> parsedData = data.map(new Function<String, VectorUrl>() {

      public VectorUrl call(String s) {

        JsonParser parser = new JsonParser();

        JsonObject jo = parser.parse(s).getAsJsonObject();

        if (!jo.has("word_vec") || !jo.has("webpageUrl")) {

          return null;

        }

        JsonArray word_vec = jo.get("word_vec").getAsJsonArray();

        String url = jo.get("webpageUrl").getAsString();

        double[] values = new double[word_vec.size()];

        for (int i = 0; i < values.length; i++)

          values[i] = word_vec.get(i).getAsInt();

        return new VectorUrl(Vectors.dense(values), url);

      }

    });



    // Index documents with unique IDs

    JavaPairRDD<Long, VectorUrl> id2doc =
JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(

        new Function<Tuple2<VectorUrl, Long>, Tuple2<Long, VectorUrl>>() {

          public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl, Long> doc_id) {

            return doc_id.swap();

          }

        }));

    JavaPairRDD<Long, String> id2Url = JavaPairRDD.fromJavaRDD(id2doc

        .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, String>>() {

          @Override

          public Tuple2<Long, String> call(Tuple2<Long, VectorUrl>
id2doc) throws Exception {

            return new Tuple2(id2doc._1, id2doc._2.url);

          }

        }));

    id2Url.saveAsTextFile(id2UrlPath);

    JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(id2doc

        .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long, Vector>>() {

          @Override

          public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl>
id2doc) throws Exception {

            return new Tuple2(id2doc._1, id2doc._2.vec);

          }

        }));

    corpus.cache();


    // Cluster the documents into three topics using LDA

    DistributedLDAModel ldaModel = (DistributedLDAModel) new
LDA().setMaxIterations(iterNumber)

        .setK(topicNumber).run(corpus);

On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fa...@gmail.com> wrote:
> I will use a portion of data and try. will the hdfs block affect
> spark?(if so, it's hard to reproduce)
>
> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <jo...@databricks.com> wrote:
>> Hi Li,
>>
>> I'm wondering if you're running into the same bug reported here:
>> https://issues.apache.org/jira/browse/SPARK-12488
>>
>> I haven't figured out yet what is causing it.  Do you have a small corpus
>> which reproduces this error, and which you can share on the JIRA?  If so,
>> that would help a lot in debugging this failure.
>>
>> Thanks!
>> Joseph
>>
>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com> wrote:
>>>
>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
>>> it throws exception in line:   Matrix topics = ldaModel.topicsMatrix();
>>> But in yarn job history ui, it's successful. What's wrong with it?
>>> I submit job with
>>> .bin/spark-submit --class Myclass \
>>>     --master yarn-client \
>>>     --num-executors 2 \
>>>     --driver-memory 4g \
>>>     --executor-memory 4g \
>>>     --executor-cores 1 \
>>>
>>>
>>> My codes:
>>>
>>>    corpus.cache();
>>>
>>>
>>>     // Cluster the documents into three topics using LDA
>>>
>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>>>
>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>>>
>>>
>>>     // Output topics. Each is a distribution over words (matching word
>>> count vectors)
>>>
>>>     System.out.println("Learned topics (as distributions over vocab of
>>> " + ldaModel.vocabSize()
>>>
>>>         + " words):");
>>>
>>>    //Line81, exception here:    Matrix topics = ldaModel.topicsMatrix();
>>>
>>>     for (int topic = 0; topic < topicNumber; topic++) {
>>>
>>>       System.out.print("Topic " + topic + ":");
>>>
>>>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
>>>
>>>         System.out.print(" " + topics.apply(word, topic));
>>>
>>>       }
>>>
>>>       System.out.println();
>>>
>>>     }
>>>
>>>
>>>     ldaModel.save(sc.sc(), modelPath);
>>>
>>>
>>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
>>> (1025,0) not in [-58,58) x [-100,100)
>>>
>>>         at
>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>>>
>>>         at
>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>>>
>>>         at
>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>>>
>>>         at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>
>>>         at
>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>
>>>         at
>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>>>
>>>         at
>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>>>
>>>         at
>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>>>
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>
>>>         at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>>>
>>>         at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>>
>>>         at
>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>
>>>         at
>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>>>
>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from shutdown
>>> hook
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: dev-help@spark.apache.org
>>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: running lda in spark throws exception

Posted by Li Li <fa...@gmail.com>.
I will use a portion of data and try. will the hdfs block affect
spark?(if so, it's hard to reproduce)

On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <jo...@databricks.com> wrote:
> Hi Li,
>
> I'm wondering if you're running into the same bug reported here:
> https://issues.apache.org/jira/browse/SPARK-12488
>
> I haven't figured out yet what is causing it.  Do you have a small corpus
> which reproduces this error, and which you can share on the JIRA?  If so,
> that would help a lot in debugging this failure.
>
> Thanks!
> Joseph
>
> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com> wrote:
>>
>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
>> it throws exception in line:   Matrix topics = ldaModel.topicsMatrix();
>> But in yarn job history ui, it's successful. What's wrong with it?
>> I submit job with
>> .bin/spark-submit --class Myclass \
>>     --master yarn-client \
>>     --num-executors 2 \
>>     --driver-memory 4g \
>>     --executor-memory 4g \
>>     --executor-cores 1 \
>>
>>
>> My codes:
>>
>>    corpus.cache();
>>
>>
>>     // Cluster the documents into three topics using LDA
>>
>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>>
>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>>
>>
>>     // Output topics. Each is a distribution over words (matching word
>> count vectors)
>>
>>     System.out.println("Learned topics (as distributions over vocab of
>> " + ldaModel.vocabSize()
>>
>>         + " words):");
>>
>>    //Line81, exception here:    Matrix topics = ldaModel.topicsMatrix();
>>
>>     for (int topic = 0; topic < topicNumber; topic++) {
>>
>>       System.out.print("Topic " + topic + ":");
>>
>>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
>>
>>         System.out.print(" " + topics.apply(word, topic));
>>
>>       }
>>
>>       System.out.println();
>>
>>     }
>>
>>
>>     ldaModel.save(sc.sc(), modelPath);
>>
>>
>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
>> (1025,0) not in [-58,58) x [-100,100)
>>
>>         at
>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>>
>>         at
>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>>
>>         at
>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>>
>>         at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>
>>         at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>
>>         at
>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>>
>>         at
>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>>
>>         at
>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>>
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>>
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from shutdown
>> hook
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> For additional commands, e-mail: dev-help@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: running lda in spark throws exception

Posted by Joseph Bradley <jo...@databricks.com>.
Hi Li,

I'm wondering if you're running into the same bug reported here:
https://issues.apache.org/jira/browse/SPARK-12488

I haven't figured out yet what is causing it.  Do you have a small corpus
which reproduces this error, and which you can share on the JIRA?  If so,
that would help a lot in debugging this failure.

Thanks!
Joseph

On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fa...@gmail.com> wrote:

> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
> it throws exception in line:   Matrix topics = ldaModel.topicsMatrix();
> But in yarn job history ui, it's successful. What's wrong with it?
> I submit job with
> .bin/spark-submit --class Myclass \
>     --master yarn-client \
>     --num-executors 2 \
>     --driver-memory 4g \
>     --executor-memory 4g \
>     --executor-cores 1 \
>
>
> My codes:
>
>    corpus.cache();
>
>
>     // Cluster the documents into three topics using LDA
>
>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>
> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>
>
>     // Output topics. Each is a distribution over words (matching word
> count vectors)
>
>     System.out.println("Learned topics (as distributions over vocab of
> " + ldaModel.vocabSize()
>
>         + " words):");
>
>    //Line81, exception here:    Matrix topics = ldaModel.topicsMatrix();
>
>     for (int topic = 0; topic < topicNumber; topic++) {
>
>       System.out.print("Topic " + topic + ":");
>
>       for (int word = 0; word < ldaModel.vocabSize(); word++) {
>
>         System.out.print(" " + topics.apply(word, topic));
>
>       }
>
>       System.out.println();
>
>     }
>
>
>     ldaModel.save(sc.sc(), modelPath);
>
>
> Exception in thread "main" java.lang.IndexOutOfBoundsException:
> (1025,0) not in [-58,58) x [-100,100)
>
>         at
> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>
>         at
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>
>         at
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
>         at
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>
>         at
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>
>         at
> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:606)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from shutdown
> hook
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>