You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Makoto Yui <yu...@gmail.com> on 2014/06/17 14:32:02 UTC

news20-binary classification with LogisticRegressionWithSGD

Hello,

I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
the number of training examples used in the evaluation is just 1,000.

It works fine for the dataset *news20.binary.1000* that has 178,560
features. However, it does not work for *news20.random.1000* where # of
features is large  (1,354,731 features) though we used a sparse vector
through MLUtils.loadLibSVMFile().

The execution seems not progressing while no error is reported in the
spark-shell as well as in the stdout/stderr of executors.

We used 32 executors with each allocating 7GB (2GB is for RDD) for
working memory.

Any suggesions? Your help is really appreciated.

==============
Executed code
==============
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

//val training = MLUtils.loadLibSVMFile(sc,
"hdfs://host:8020/dataset/news20-binary/news20.binary.1000",
multiclass=false)
val training = MLUtils.loadLibSVMFile(sc,
"hdfs://host:8020/dataset/news20-binary/news20.random.1000",
multiclass=false)

val numFeatures = training .take(1)(0).features.size
//numFeatures: Int = 178560 for news20.binary.1000
//numFeatures: Int = 1354731 for news20.random.1000
val model = LogisticRegressionWithSGD.train(training, numIterations=1)

==================================
The dataset used in the evaluation
==================================

http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary

$ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' >
news20.binary.1000
$ sort -R news20.binary > news20.random
$ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' >
news20.random.1000

You can find the dataset in
https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000


Thanks,
Makoto

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by DB Tsai <db...@stanford.edu>.
Hi Xiangrui,

What's different between treeAggregate and aggregate? Why
treeAggregate scales better? What if we just use mapPartition, will it
be as fast as treeAggregate?

Thanks.

Sincerely,

DB Tsai
-------------------------------------------------------
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Tue, Jun 17, 2014 at 12:58 PM, Xiangrui Meng <me...@gmail.com> wrote:
> Hi Makoto,
>
> How many partitions did you set? If there are too many partitions,
> please do a coalesce before calling ML algorithms.
>
> Btw, could you try the tree branch in my repo?
> https://github.com/mengxr/spark/tree/tree
>
> I used tree aggregate in this branch. It should help with the scalability.
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 12:22 PM, Makoto Yui <yu...@gmail.com> wrote:
>> Here is follow-up to the previous evaluation.
>>
>> "aggregate at GradientDescent.scala:178" never finishes at
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178
>>
>> We confirmed, by -verbose:gc, that GC is not happening during the aggregate
>> and the cumulative CPU time for the task is increasing little by little.
>>
>> LBFGS also does not work for large # of features (news20.random.1000)
>> though it works fine for small # of features (news20.binary.1000).
>>
>> "aggregate at LBFGS.scala:201" also never finishes at
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201
>>
>> -----------------------------------------------------------------------
>> [Evaluated code for LBFGS]
>>
>> import org.apache.spark.SparkContext
>> import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
>> import org.apache.spark.mllib.linalg.Vectors
>> import org.apache.spark.mllib.util.MLUtils
>> import org.apache.spark.mllib.classification.LogisticRegressionModel
>> import org.apache.spark.mllib.optimization._
>>
>> val data = MLUtils.loadLibSVMFile(sc,
>> "hdfs://dm01:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false)
>> val numFeatures = data.take(1)(0).features.size
>>
>> val training = data.map(x => (x.label, MLUtils.appendBias(x.features))).cache()
>>
>> // Run training algorithm to build the model
>> val numCorrections = 10
>> val convergenceTol = 1e-4
>> val maxNumIterations = 20
>> val regParam = 0.1
>> val initialWeightsWithIntercept = Vectors.dense(new
>> Array[Double](numFeatures + 1))
>>
>> val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
>>   training,
>>   new LogisticGradient(),
>>   new SquaredL2Updater(),
>>   numCorrections,
>>   convergenceTol,
>>   maxNumIterations,
>>   regParam,
>>   initialWeightsWithIntercept)
>> -----------------------------------------------------------------------
>>
>>
>> Thanks,
>> Makoto
>>
>> 2014-06-17 21:32 GMT+09:00 Makoto Yui <yu...@gmail.com>:
>>> Hello,
>>>
>>> I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
>>> Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
>>> the number of training examples used in the evaluation is just 1,000.
>>>
>>> It works fine for the dataset *news20.binary.1000* that has 178,560
>>> features. However, it does not work for *news20.random.1000* where # of
>>> features is large  (1,354,731 features) though we used a sparse vector
>>> through MLUtils.loadLibSVMFile().
>>>
>>> The execution seems not progressing while no error is reported in the
>>> spark-shell as well as in the stdout/stderr of executors.
>>>
>>> We used 32 executors with each allocating 7GB (2GB is for RDD) for
>>> working memory.
>>>
>>> Any suggesions? Your help is really appreciated.
>>>
>>> ==============
>>> Executed code
>>> ==============
>>> import org.apache.spark.mllib.util.MLUtils
>>> import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
>>>
>>> //val training = MLUtils.loadLibSVMFile(sc,
>>> "hdfs://host:8020/dataset/news20-binary/news20.binary.1000",
>>> multiclass=false)
>>> val training = MLUtils.loadLibSVMFile(sc,
>>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>>> multiclass=false)
>>>
>>> val numFeatures = training .take(1)(0).features.size
>>> //numFeatures: Int = 178560 for news20.binary.1000
>>> //numFeatures: Int = 1354731 for news20.random.1000
>>> val model = LogisticRegressionWithSGD.train(training, numIterations=1)
>>>
>>> ==================================
>>> The dataset used in the evaluation
>>> ==================================
>>>
>>> http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary
>>>
>>> $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' >
>>> news20.binary.1000
>>> $ sort -R news20.binary > news20.random
>>> $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' >
>>> news20.random.1000
>>>
>>> You can find the dataset in
>>> https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
>>> https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000
>>>
>>>
>>> Thanks,
>>> Makoto

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Makoto Yui <yu...@gmail.com>.
Hi Xiangrui,

(2014/06/18 6:03), Xiangrui Meng wrote:
> Are you using Spark 1.0 or 0.9? Could you go to the executor tab of
> the web UI and check the driver's memory?

I am using Spark 1.0.

588.8 MB is allocated for <driver> RDDs.
I am setting SPARK_DRIVER_MEMORY=2g in the conf/spark-env.sh.

The value allocated for <driver> RDDs in the web UI was not changed by 
doing as follows:
$ SPARK_DRIVER_MEMORY=6g bin/spark-shell

I set "-verbose:gc" but full GC (or continuous GCs) does not happen 
during the aggregate at the driver.

Thanks,
Makoto

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Xiangrui Meng <me...@gmail.com>.
Hi Makoto,

Are you using Spark 1.0 or 0.9? Could you go to the executor tab of
the web UI and check the driver's memory?

treeAggregate is not part of 1.0.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng <me...@gmail.com> wrote:
> Hi DB,
>
> treeReduce (treeAggregate) is a feature I'm testing now. It is a
> compromise between current reduce and butterfly allReduce. The former
> runs in linear time on the number of partitions, the latter introduces
> too many dependencies. treeAggregate with depth = 2 should run in
> O(sqrt(n)) time, where n is the number of partitions. It would be
> great if someone can help test its scalability.
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui <yu...@gmail.com> wrote:
>> Hi Xiangrui,
>>
>>
>> (2014/06/18 4:58), Xiangrui Meng wrote:
>>>
>>> How many partitions did you set? If there are too many partitions,
>>> please do a coalesce before calling ML algorithms.
>>
>>
>> The training data "news20.random.1000" is small and thus only 2 partitions
>> are used by the default.
>>
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false).
>>
>> We also tried 32 partitions as follows but the aggregate never finishes.
>>
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>>
>>
>>> Btw, could you try the tree branch in my repo?
>>> https://github.com/mengxr/spark/tree/tree
>>>
>>> I used tree aggregate in this branch. It should help with the scalability.
>>
>>
>> Is treeAggregate itself available on Spark 1.0?
>>
>> I wonder.. Could I test your modification just by running the following code
>> on REPL?
>>
>> -------------------
>> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
>>         .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>>           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
>> features)) =>
>>             val l = gradient.compute(features, label, weights,
>> Vectors.fromBreeze(grad))
>>             (grad, loss + l)
>>           },
>>           combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
>> (grad2, loss2)) =>
>>             (grad1 += grad2, loss1 + loss2)
>>           }, 2)
>> -------------------------
>>
>> Rebuilding Spark is quite something to do evaluation.
>>
>> Thanks,
>> Makoto

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Makoto Yui <yu...@gmail.com>.
Hi Xiangrui,

(2014/06/18 8:49), Xiangrui Meng wrote:
> Makoto, dense vectors are used to in aggregation. If you have 32
> partitions and each one sending a dense vector of size 1,354,731 to
> master. Then the driver needs 300M+. That may be the problem.

It seems that it could cuase certain problems for a convex optimization 
of large training data and a merging tree, like allreduce, would help to 
reduce memory requirements (though time for aggregation might increase).

> Which deploy mode are you using, standalone or local?

Standalone.

Setting -driver-memory 8G was not solved the aggregate problem.
Aggregation never finishes.

`ps aux | grep spark` on master is as follows:

myui      7049 79.3  1.1 8768868 592348 pts/2  Sl+  11:10   0:14 
/usr/java/jdk1.7/bin/java -cp 
::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf 
-XX:MaxPermSize=128m -verbose:gc -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -Djava.library.path= -Xms2g -Xmx2g 
org.apache.spark.deploy.SparkSubmit spark-shell --driver-memory 8G 
--class org.apache.spark.repl.Main

myui      5694  2.5  0.5 6868296 292572 pts/2  Sl   10:59   0:17 
/usr/java/jdk1.7/bin/java -cp 
::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf 
-XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m 
-Xmx512m org.apache.spark.deploy.master.Master --ip 10.0.0.1 --port 7077 
--webui-port 8081

----------------------------------------
Exporting SPARK_DAEMON_MEMORY=4g in spark-env.sh did not take effect for 
the evaluation.

`ps aux | grep spark`
/usr/java/jdk1.7/bin/java -cp 
::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf 
-XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms4g -Xmx4g 
org.apache.spark.deploy.master.Master --ip 10.0.0.1 --port 7077 
--webui-port 8081
...


Thanks,
Makoto

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Xiangrui Meng <me...@gmail.com>.
Makoto, please use --driver-memory 8G when you launch spark-shell. -Xiangrui

On Tue, Jun 17, 2014 at 4:49 PM, Xiangrui Meng <me...@gmail.com> wrote:
> DB, Yes, reduce and aggregate are linear.
>
> Makoto, dense vectors are used to in aggregation. If you have 32
> partitions and each one sending a dense vector of size 1,354,731 to
> master. Then the driver needs 300M+. That may be the problem. Which
> deploy mode are you using, standalone or local?
>
> Debasish, there is an old PR for butterfly allreduce. However, it
> doesn't seem to be the right way to go for Spark. I just sent out the
> PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it
> needs more testing before we are confident to merge it. It would be
> great if you can help test it.
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das <de...@gmail.com> wrote:
>> Xiangrui,
>>
>> Could you point to the JIRA related to tree aggregate ? ...sounds like the
>> allreduce idea...
>>
>> I would definitely like to try it on our dataset...
>>
>> Makoto,
>>
>> I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got
>> 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB
>> memory...
>>
>> Although the best result on the same dataset came out of liblinear and
>> BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
>> other heuristics...it was arnd 5% off...
>>
>> Thanks.
>> Deb
>>
>>
>>
>> On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai <db...@stanford.edu> wrote:
>>>
>>> Hi Xiangrui,
>>>
>>> Does it mean that mapPartition and then reduce shares the same
>>> behavior as aggregate operation which is O(n)?
>>>
>>> Sincerely,
>>>
>>> DB Tsai
>>> -------------------------------------------------------
>>> My Blog: https://www.dbtsai.com
>>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>>
>>>
>>> On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng <me...@gmail.com> wrote:
>>> > Hi DB,
>>> >
>>> > treeReduce (treeAggregate) is a feature I'm testing now. It is a
>>> > compromise between current reduce and butterfly allReduce. The former
>>> > runs in linear time on the number of partitions, the latter introduces
>>> > too many dependencies. treeAggregate with depth = 2 should run in
>>> > O(sqrt(n)) time, where n is the number of partitions. It would be
>>> > great if someone can help test its scalability.
>>> >
>>> > Best,
>>> > Xiangrui
>>> >
>>> > On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui <yu...@gmail.com> wrote:
>>> >> Hi Xiangrui,
>>> >>
>>> >>
>>> >> (2014/06/18 4:58), Xiangrui Meng wrote:
>>> >>>
>>> >>> How many partitions did you set? If there are too many partitions,
>>> >>> please do a coalesce before calling ML algorithms.
>>> >>
>>> >>
>>> >> The training data "news20.random.1000" is small and thus only 2
>>> >> partitions
>>> >> are used by the default.
>>> >>
>>> >> val training = MLUtils.loadLibSVMFile(sc,
>>> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>>> >> multiclass=false).
>>> >>
>>> >> We also tried 32 partitions as follows but the aggregate never
>>> >> finishes.
>>> >>
>>> >> val training = MLUtils.loadLibSVMFile(sc,
>>> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>>> >> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>>> >>
>>> >>
>>> >>> Btw, could you try the tree branch in my repo?
>>> >>> https://github.com/mengxr/spark/tree/tree
>>> >>>
>>> >>> I used tree aggregate in this branch. It should help with the
>>> >>> scalability.
>>> >>
>>> >>
>>> >> Is treeAggregate itself available on Spark 1.0?
>>> >>
>>> >> I wonder.. Could I test your modification just by running the following
>>> >> code
>>> >> on REPL?
>>> >>
>>> >> -------------------
>>> >> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
>>> >> i)
>>> >>         .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>>> >>           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
>>> >> features)) =>
>>> >>             val l = gradient.compute(features, label, weights,
>>> >> Vectors.fromBreeze(grad))
>>> >>             (grad, loss + l)
>>> >>           },
>>> >>           combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
>>> >> (grad2, loss2)) =>
>>> >>             (grad1 += grad2, loss1 + loss2)
>>> >>           }, 2)
>>> >> -------------------------
>>> >>
>>> >> Rebuilding Spark is quite something to do evaluation.
>>> >>
>>> >> Thanks,
>>> >> Makoto
>>
>>

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Xiangrui Meng <me...@gmail.com>.
DB, Yes, reduce and aggregate are linear.

Makoto, dense vectors are used to in aggregation. If you have 32
partitions and each one sending a dense vector of size 1,354,731 to
master. Then the driver needs 300M+. That may be the problem. Which
deploy mode are you using, standalone or local?

Debasish, there is an old PR for butterfly allreduce. However, it
doesn't seem to be the right way to go for Spark. I just sent out the
PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it
needs more testing before we are confident to merge it. It would be
great if you can help test it.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das <de...@gmail.com> wrote:
> Xiangrui,
>
> Could you point to the JIRA related to tree aggregate ? ...sounds like the
> allreduce idea...
>
> I would definitely like to try it on our dataset...
>
> Makoto,
>
> I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got
> 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB
> memory...
>
> Although the best result on the same dataset came out of liblinear and
> BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
> other heuristics...it was arnd 5% off...
>
> Thanks.
> Deb
>
>
>
> On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai <db...@stanford.edu> wrote:
>>
>> Hi Xiangrui,
>>
>> Does it mean that mapPartition and then reduce shares the same
>> behavior as aggregate operation which is O(n)?
>>
>> Sincerely,
>>
>> DB Tsai
>> -------------------------------------------------------
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng <me...@gmail.com> wrote:
>> > Hi DB,
>> >
>> > treeReduce (treeAggregate) is a feature I'm testing now. It is a
>> > compromise between current reduce and butterfly allReduce. The former
>> > runs in linear time on the number of partitions, the latter introduces
>> > too many dependencies. treeAggregate with depth = 2 should run in
>> > O(sqrt(n)) time, where n is the number of partitions. It would be
>> > great if someone can help test its scalability.
>> >
>> > Best,
>> > Xiangrui
>> >
>> > On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui <yu...@gmail.com> wrote:
>> >> Hi Xiangrui,
>> >>
>> >>
>> >> (2014/06/18 4:58), Xiangrui Meng wrote:
>> >>>
>> >>> How many partitions did you set? If there are too many partitions,
>> >>> please do a coalesce before calling ML algorithms.
>> >>
>> >>
>> >> The training data "news20.random.1000" is small and thus only 2
>> >> partitions
>> >> are used by the default.
>> >>
>> >> val training = MLUtils.loadLibSVMFile(sc,
>> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> >> multiclass=false).
>> >>
>> >> We also tried 32 partitions as follows but the aggregate never
>> >> finishes.
>> >>
>> >> val training = MLUtils.loadLibSVMFile(sc,
>> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> >> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>> >>
>> >>
>> >>> Btw, could you try the tree branch in my repo?
>> >>> https://github.com/mengxr/spark/tree/tree
>> >>>
>> >>> I used tree aggregate in this branch. It should help with the
>> >>> scalability.
>> >>
>> >>
>> >> Is treeAggregate itself available on Spark 1.0?
>> >>
>> >> I wonder.. Could I test your modification just by running the following
>> >> code
>> >> on REPL?
>> >>
>> >> -------------------
>> >> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
>> >> i)
>> >>         .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>> >>           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
>> >> features)) =>
>> >>             val l = gradient.compute(features, label, weights,
>> >> Vectors.fromBreeze(grad))
>> >>             (grad, loss + l)
>> >>           },
>> >>           combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
>> >> (grad2, loss2)) =>
>> >>             (grad1 += grad2, loss1 + loss2)
>> >>           }, 2)
>> >> -------------------------
>> >>
>> >> Rebuilding Spark is quite something to do evaluation.
>> >>
>> >> Thanks,
>> >> Makoto
>
>

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Makoto Yui <yu...@gmail.com>.
Xiangrui,

(2014/06/19 23:43), Xiangrui Meng wrote:
> It is because the frame size is not set correctly in executor backend. see spark-1112 . We are going to fix it in v1.0.1 . Did you try the treeAggregate?

Not yet. I will wait the v1.0.1 release.

Thanks,
Makoto

Re: akka disassociated on GC

Posted by Makoto Yui <yu...@gmail.com>.
Hi Xiangrui,

(2014/07/16 15:05), Xiangrui Meng wrote:
> I don't remember I wrote that but thanks for bringing this issue up!
> There are two important settings to check: 1) driver memory (you can
> see it from the executor tab), 2) number of partitions (try to use
> small number of partitions). I put two PRs to fix the problem:

For the driver memory, I used 16GB/24GB and it was enough for the 
execution (full GC was not happen). I check it by using jmap and top 
command.

BTW, I was faced that the required memory for driver was oddly 
proportional to # of tasks/executors. When I used 8GB for the driver 
memory, I got OOM in the task serialization. It could be considered as a 
possible memory leak in the task serialization to be addressed in the 
future.

Each task size is about 24MB and # of tasks/executors is 280.
The size of each task result was about 120MB or so.

 > 1) use broadcast in task closure: 
https://github.com/apache/spark/pull/1427

Does this PR reduce the required memory for the driver?

Is there a big difference in explicit broadcast of feature weights and 
implicit task serialization including feature weights?

 > 2) use treeAggregate to get the result:
 > https://github.com/apache/spark/pull/1110

treeAggregate would reduce the time for aggregation and the required 
memory of a driver for sure. I would test it.

However, the problem that I am facing now is an akka connection issue on 
GC, or under heavy loads. And thus, I think the problem is lurking 
behind even though the consumed memory size is reduced by treeAggregate.

Best,
Makoto

Re: akka disassociated on GC

Posted by Xiangrui Meng <me...@gmail.com>.
Yes, that's the plan. If you use broadcast, please also make sure
TorrentBroadcastFactory is used, which became the default broadcast
factory very recently. -Xiangrui

On Tue, Jul 22, 2014 at 10:47 PM, Makoto Yui <yu...@gmail.com> wrote:
> Hi Xiangrui,
>
> By using your treeAggregate and broadcast patch, the evaluation has been
> processed successfully.
>
> I expect that these patches are merged in the next major release (v1.1?).
> Without them, it would be hard to use mllib for a large dataset.
>
> Thanks,
> Makoto
>
>
> (2014/07/16 15:05), Xiangrui Meng wrote:
>>
>> Hi Makoto,
>>
>> I don't remember I wrote that but thanks for bringing this issue up!
>> There are two important settings to check: 1) driver memory (you can
>> see it from the executor tab), 2) number of partitions (try to use
>> small number of partitions). I put two PRs to fix the problem:
>>
>> 1) use broadcast in task closure:
>> https://github.com/apache/spark/pull/1427
>> 2) use treeAggregate to get the result:
>> https://github.com/apache/spark/pull/1110
>>
>> They are still under review. Once merged, the problem should be fixed.
>> I will test the KDDB dataset and report back. Thanks!
>>
>> Best,
>> Xiangrui
>>
>> On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui <yu...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> (2014/06/19 23:43), Xiangrui Meng wrote:
>>>>>
>>>>>
>>>>> The execution was slow for more large KDD cup 2012, Track 2 dataset
>>>>> (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to
>>>>> the
>>>>> sequential aggregation of dense vectors on a single driver node.
>>>>>
>>>>> It took about 7.6m for aggregation for an iteration.
>>>
>>>
>>>
>>> When running the above test, I got another error at the beginning of the
>>> 2nd
>>> iteration when enabling iterations.
>>>
>>> It works fine for the first iteration but the 2nd iteration always fails.
>>>
>>> It seems that akka connections are suddenly disassociated when GC happens
>>> on
>>> the driver node. Two possible causes can be considered:
>>> 1) The driver is under a heavy load because of GC; so executors cannot
>>> connect to the driver. Changing akka timeout setting did not resolve the
>>> issue.
>>> 2) akka oddly released valid connections on GC.
>>>
>>> I'm using spark 1.0.1 and timeout setting of akka as follows did not
>>> resolve
>>> the problem.
>>>
>>> [spark-defaults.conf]
>>> spark.akka.frameSize     50
>>> spark.akka.timeout       120
>>> spark.akka.askTimeout    120
>>> spark.akka.lookupTimeout 120
>>> spark.akka.heartbeat.pauses     600
>>>
>>> It seems this issue is related to one previously discussed in
>>> http://markmail.org/message/p2i34frtf4iusdfn
>>>
>>> Are there any preferred configurations or workaround for this issue?
>>>
>>> Thanks,
>>> Makoto
>>>
>>> --------------------------------------------
>>> [The error log of the driver]
>>>
>>> 14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117
>>> as
>>> 25300254 bytes in 35 ms
>>> 666.108: [GC [PSYoungGen: 6540914K->975362K(7046784K)]
>>> 12419091K->7792529K(23824000K), 5.2157830 secs] [Times: user=0.00
>>> sys=68.43,
>>> real=5.22 secs]
>>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc09.mydomain.org,34565)
>>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>>> ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
>>> 14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated:
>>> app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
>>> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
>>> SendingConnectionManagerId not found
>>> 14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor
>>> app-20140714180032-0010/8 removed: Command exited with code 1
>>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc30.mydomain.org,59016)
>>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>>> ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
>>> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
>>> SendingConnectionManagerId not found
>>> 672.596: [GC [PSYoungGen: 6642785K->359202K(6059072K)]
>>> 13459952K->8065935K(22836288K), 2.8260220 secs] [Times: user=2.83
>>> sys=33.72,
>>> real=2.83 secs]
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc03.mydomain.org,43278)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc02.mydomain.org,54538)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc18.mydomain.org,58100)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc18.mydomain.org,58100)
>>>
>>> The full log is uploaded on
>>> https://dl.dropboxusercontent.com/u/13123103/driver.log
>>>
>>> --------------------------------------------
>>> [The error log of a worker]
>>> 14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8
>>> finished with state EXITED message Command exited with code 1 exitStatus
>>> 1
>>> 14/07/14 18:11:38 INFO actor.LocalActorRef: Message
>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
>>> Actor[akka://sparkWorker/deadLetters] to
>>>
>>> Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.0.1.9%3A60601-39#1322474303]
>>> was not delivered. [13] dead letters encountered. This logging can be
>>> turned
>>> off or adjusted with configuration settings 'akka.log-dead-letters' and
>>> 'akka.log-dead-letters-during-shutdown'.
>>> 14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError
>>> [akka.tcp://sparkWorker@dc09.mydomain.org:39578] ->
>>> [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]: Error [Association
>>> failed with [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]] [
>>> akka.remote.EndpointAssociationException: Association failed with
>>> [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]
>>> Caused by:
>>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>>> Connection refused: dc09.mydomain.org/10.0.1.9:33886]
>>> 14/07/14 18:11:38 INFO worker.Worker: Asked to launch executor
>>> app-20140714180032-0010/32 for Spark shell
>>> 14/07/14 18:11:38 WARN worker.CommandUtils: SPARK_JAVA_OPTS was set on
>>> the
>>> worker. It is deprecated in Spark 1.0.
>>> 14/07/14 18:11:38 WARN worker.CommandUtils: Set SPARK_LOCAL_DIRS for
>>> node-specific storage locations.
>>> 14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError
>>> [akka.tcp://sparkWorker@dc09.mydomain.org:39578] ->
>>> [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]: Error [Association
>>> failed with [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]] [
>>> akka.remote.EndpointAssociationException: Association failed with
>>> [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]
>>> Caused by:
>>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>>> Connection refused: dc09.mydomain.org/10.0.1.9:33886]
>>
>>
>

Re: akka disassociated on GC

Posted by Makoto Yui <yu...@gmail.com>.
Hi Xiangrui,

By using your treeAggregate and broadcast patch, the evaluation has been 
processed successfully.

I expect that these patches are merged in the next major release 
(v1.1?). Without them, it would be hard to use mllib for a large dataset.

Thanks,
Makoto

(2014/07/16 15:05), Xiangrui Meng wrote:
> Hi Makoto,
>
> I don't remember I wrote that but thanks for bringing this issue up!
> There are two important settings to check: 1) driver memory (you can
> see it from the executor tab), 2) number of partitions (try to use
> small number of partitions). I put two PRs to fix the problem:
>
> 1) use broadcast in task closure: https://github.com/apache/spark/pull/1427
> 2) use treeAggregate to get the result:
> https://github.com/apache/spark/pull/1110
>
> They are still under review. Once merged, the problem should be fixed.
> I will test the KDDB dataset and report back. Thanks!
>
> Best,
> Xiangrui
>
> On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui <yu...@gmail.com> wrote:
>> Hello,
>>
>> (2014/06/19 23:43), Xiangrui Meng wrote:
>>>>
>>>> The execution was slow for more large KDD cup 2012, Track 2 dataset
>>>> (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the
>>>> sequential aggregation of dense vectors on a single driver node.
>>>>
>>>> It took about 7.6m for aggregation for an iteration.
>>
>>
>> When running the above test, I got another error at the beginning of the 2nd
>> iteration when enabling iterations.
>>
>> It works fine for the first iteration but the 2nd iteration always fails.
>>
>> It seems that akka connections are suddenly disassociated when GC happens on
>> the driver node. Two possible causes can be considered:
>> 1) The driver is under a heavy load because of GC; so executors cannot
>> connect to the driver. Changing akka timeout setting did not resolve the
>> issue.
>> 2) akka oddly released valid connections on GC.
>>
>> I'm using spark 1.0.1 and timeout setting of akka as follows did not resolve
>> the problem.
>>
>> [spark-defaults.conf]
>> spark.akka.frameSize     50
>> spark.akka.timeout       120
>> spark.akka.askTimeout    120
>> spark.akka.lookupTimeout 120
>> spark.akka.heartbeat.pauses     600
>>
>> It seems this issue is related to one previously discussed in
>> http://markmail.org/message/p2i34frtf4iusdfn
>>
>> Are there any preferred configurations or workaround for this issue?
>>
>> Thanks,
>> Makoto
>>
>> --------------------------------------------
>> [The error log of the driver]
>>
>> 14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 as
>> 25300254 bytes in 35 ms
>> 666.108: [GC [PSYoungGen: 6540914K->975362K(7046784K)]
>> 12419091K->7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 sys=68.43,
>> real=5.22 secs]
>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
>> to ConnectionManagerId(dc09.mydomain.org,34565)
>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>> ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
>> 14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated:
>> app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
>> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
>> SendingConnectionManagerId not found
>> 14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor
>> app-20140714180032-0010/8 removed: Command exited with code 1
>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
>> to ConnectionManagerId(dc30.mydomain.org,59016)
>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>> ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
>> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
>> SendingConnectionManagerId not found
>> 672.596: [GC [PSYoungGen: 6642785K->359202K(6059072K)]
>> 13459952K->8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 sys=33.72,
>> real=2.83 secs]
>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>> ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
>> to ConnectionManagerId(dc03.mydomain.org,43278)
>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
>> to ConnectionManagerId(dc02.mydomain.org,54538)
>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>> ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
>> to ConnectionManagerId(dc18.mydomain.org,58100)
>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
>> to ConnectionManagerId(dc18.mydomain.org,58100)
>>
>> The full log is uploaded on
>> https://dl.dropboxusercontent.com/u/13123103/driver.log
>>
>> --------------------------------------------
>> [The error log of a worker]
>> 14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8
>> finished with state EXITED message Command exited with code 1 exitStatus 1
>> 14/07/14 18:11:38 INFO actor.LocalActorRef: Message
>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
>> Actor[akka://sparkWorker/deadLetters] to
>> Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.0.1.9%3A60601-39#1322474303]
>> was not delivered. [13] dead letters encountered. This logging can be turned
>> off or adjusted with configuration settings 'akka.log-dead-letters' and
>> 'akka.log-dead-letters-during-shutdown'.
>> 14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError
>> [akka.tcp://sparkWorker@dc09.mydomain.org:39578] ->
>> [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]: Error [Association
>> failed with [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]] [
>> akka.remote.EndpointAssociationException: Association failed with
>> [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]
>> Caused by:
>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>> Connection refused: dc09.mydomain.org/10.0.1.9:33886]
>> 14/07/14 18:11:38 INFO worker.Worker: Asked to launch executor
>> app-20140714180032-0010/32 for Spark shell
>> 14/07/14 18:11:38 WARN worker.CommandUtils: SPARK_JAVA_OPTS was set on the
>> worker. It is deprecated in Spark 1.0.
>> 14/07/14 18:11:38 WARN worker.CommandUtils: Set SPARK_LOCAL_DIRS for
>> node-specific storage locations.
>> 14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError
>> [akka.tcp://sparkWorker@dc09.mydomain.org:39578] ->
>> [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]: Error [Association
>> failed with [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]] [
>> akka.remote.EndpointAssociationException: Association failed with
>> [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]
>> Caused by:
>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>> Connection refused: dc09.mydomain.org/10.0.1.9:33886]
>


Re: akka disassociated on GC

Posted by Xiangrui Meng <me...@gmail.com>.
Hi Makoto,

I don't remember I wrote that but thanks for bringing this issue up!
There are two important settings to check: 1) driver memory (you can
see it from the executor tab), 2) number of partitions (try to use
small number of partitions). I put two PRs to fix the problem:

1) use broadcast in task closure: https://github.com/apache/spark/pull/1427
2) use treeAggregate to get the result:
https://github.com/apache/spark/pull/1110

They are still under review. Once merged, the problem should be fixed.
I will test the KDDB dataset and report back. Thanks!

Best,
Xiangrui

On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui <yu...@gmail.com> wrote:
> Hello,
>
> (2014/06/19 23:43), Xiangrui Meng wrote:
>>>
>>> The execution was slow for more large KDD cup 2012, Track 2 dataset
>>> (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the
>>> sequential aggregation of dense vectors on a single driver node.
>>>
>>> It took about 7.6m for aggregation for an iteration.
>
>
> When running the above test, I got another error at the beginning of the 2nd
> iteration when enabling iterations.
>
> It works fine for the first iteration but the 2nd iteration always fails.
>
> It seems that akka connections are suddenly disassociated when GC happens on
> the driver node. Two possible causes can be considered:
> 1) The driver is under a heavy load because of GC; so executors cannot
> connect to the driver. Changing akka timeout setting did not resolve the
> issue.
> 2) akka oddly released valid connections on GC.
>
> I'm using spark 1.0.1 and timeout setting of akka as follows did not resolve
> the problem.
>
> [spark-defaults.conf]
> spark.akka.frameSize     50
> spark.akka.timeout       120
> spark.akka.askTimeout    120
> spark.akka.lookupTimeout 120
> spark.akka.heartbeat.pauses     600
>
> It seems this issue is related to one previously discussed in
> http://markmail.org/message/p2i34frtf4iusdfn
>
> Are there any preferred configurations or workaround for this issue?
>
> Thanks,
> Makoto
>
> --------------------------------------------
> [The error log of the driver]
>
> 14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 as
> 25300254 bytes in 35 ms
> 666.108: [GC [PSYoungGen: 6540914K->975362K(7046784K)]
> 12419091K->7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 sys=68.43,
> real=5.22 secs]
> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(dc09.mydomain.org,34565)
> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
> ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
> 14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated:
> app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
> SendingConnectionManagerId not found
> 14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor
> app-20140714180032-0010/8 removed: Command exited with code 1
> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(dc30.mydomain.org,59016)
> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
> ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
> SendingConnectionManagerId not found
> 672.596: [GC [PSYoungGen: 6642785K->359202K(6059072K)]
> 13459952K->8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 sys=33.72,
> real=2.83 secs]
> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
> ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(dc03.mydomain.org,43278)
> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(dc02.mydomain.org,54538)
> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
> ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(dc18.mydomain.org,58100)
> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(dc18.mydomain.org,58100)
>
> The full log is uploaded on
> https://dl.dropboxusercontent.com/u/13123103/driver.log
>
> --------------------------------------------
> [The error log of a worker]
> 14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8
> finished with state EXITED message Command exited with code 1 exitStatus 1
> 14/07/14 18:11:38 INFO actor.LocalActorRef: Message
> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
> Actor[akka://sparkWorker/deadLetters] to
> Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.0.1.9%3A60601-39#1322474303]
> was not delivered. [13] dead letters encountered. This logging can be turned
> off or adjusted with configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
> 14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError
> [akka.tcp://sparkWorker@dc09.mydomain.org:39578] ->
> [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]: Error [Association
> failed with [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]] [
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: dc09.mydomain.org/10.0.1.9:33886]
> 14/07/14 18:11:38 INFO worker.Worker: Asked to launch executor
> app-20140714180032-0010/32 for Spark shell
> 14/07/14 18:11:38 WARN worker.CommandUtils: SPARK_JAVA_OPTS was set on the
> worker. It is deprecated in Spark 1.0.
> 14/07/14 18:11:38 WARN worker.CommandUtils: Set SPARK_LOCAL_DIRS for
> node-specific storage locations.
> 14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError
> [akka.tcp://sparkWorker@dc09.mydomain.org:39578] ->
> [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]: Error [Association
> failed with [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]] [
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: dc09.mydomain.org/10.0.1.9:33886]

akka disassociated on GC

Posted by Makoto Yui <yu...@gmail.com>.
Hello,

(2014/06/19 23:43), Xiangrui Meng wrote:
>> The execution was slow for more large KDD cup 2012, Track 2 dataset (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the sequential aggregation of dense vectors on a single driver node.
>>
>> It took about 7.6m for aggregation for an iteration.

When running the above test, I got another error at the beginning of the 
2nd iteration when enabling iterations.

It works fine for the first iteration but the 2nd iteration always fails.

It seems that akka connections are suddenly disassociated when GC 
happens on the driver node. Two possible causes can be considered:
1) The driver is under a heavy load because of GC; so executors cannot 
connect to the driver. Changing akka timeout setting did not resolve the 
issue.
2) akka oddly released valid connections on GC.

I'm using spark 1.0.1 and timeout setting of akka as follows did not 
resolve the problem.

[spark-defaults.conf]
spark.akka.frameSize     50
spark.akka.timeout       120
spark.akka.askTimeout    120
spark.akka.lookupTimeout 120
spark.akka.heartbeat.pauses     600

It seems this issue is related to one previously discussed in
http://markmail.org/message/p2i34frtf4iusdfn

Are there any preferred configurations or workaround for this issue?

Thanks,
Makoto

--------------------------------------------
[The error log of the driver]

14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 
as 25300254 bytes in 35 ms
666.108: [GC [PSYoungGen: 6540914K->975362K(7046784K)] 
12419091K->7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 
sys=68.43, real=5.22 secs]
14/07/14 18:11:38 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing 
ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated: 
app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding 
SendingConnectionManagerId not found
14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor 
app-20140714180032-0010/8 removed: Command exited with code 1
14/07/14 18:11:38 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing 
ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding 
SendingConnectionManagerId not found
672.596: [GC [PSYoungGen: 6642785K->359202K(6059072K)] 
13459952K->8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 
sys=33.72, real=2.83 secs]
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc02.mydomain.org,54538)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc18.mydomain.org,58100)

The full log is uploaded on
https://dl.dropboxusercontent.com/u/13123103/driver.log

--------------------------------------------
[The error log of a worker]
14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8 
finished with state EXITED message Command exited with code 1 exitStatus 1
14/07/14 18:11:38 INFO actor.LocalActorRef: Message 
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] 
from Actor[akka://sparkWorker/deadLetters] to 
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.0.1.9%3A60601-39#1322474303] 
was not delivered. [13] dead letters encountered. This logging can be 
turned off or adjusted with configuration settings 
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@dc09.mydomain.org:39578] -> 
[akka.tcp://sparkExecutor@dc09.mydomain.org:33886]: Error [Association 
failed with [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkExecutor@dc09.mydomain.org:33886]
Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
Connection refused: dc09.mydomain.org/10.0.1.9:33886]
14/07/14 18:11:38 INFO worker.Worker: Asked to launch executor 
app-20140714180032-0010/32 for Spark shell
14/07/14 18:11:38 WARN worker.CommandUtils: SPARK_JAVA_OPTS was set on 
the worker. It is deprecated in Spark 1.0.
14/07/14 18:11:38 WARN worker.CommandUtils: Set SPARK_LOCAL_DIRS for 
node-specific storage locations.
14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@dc09.mydomain.org:39578] -> 
[akka.tcp://sparkExecutor@dc09.mydomain.org:33886]: Error [Association 
failed with [akka.tcp://sparkExecutor@dc09.mydomain.org:33886]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkExecutor@dc09.mydomain.org:33886]
Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
Connection refused: dc09.mydomain.org/10.0.1.9:33886]

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Xiangrui Meng <me...@gmail.com>.
It is because the frame size is not set correctly in executor backend. see spark-1112 . We are going to fix it in v1.0.1 . Did you try the treeAggregate?

> On Jun 19, 2014, at 2:01 AM, Makoto Yui <yu...@gmail.com> wrote:
> 
> Xiangrui and Debasish,
> 
> (2014/06/18 6:33), Debasish Das wrote:
>> I did run pretty big sparse dataset (20M rows, 3M sparse features) and I
>> got 100 iterations of SGD running in 200 seconds...10 executors each
>> with 16 GB memory...
> 
> I could figure out what the problem is. "spark.akka.frameSize" was too large. By setting spark.akka.frameSize=10, it worked for the news20 dataset.
> 
> The execution was slow for more large KDD cup 2012, Track 2 dataset (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the sequential aggregation of dense vectors on a single driver node.
> 
> It took about 7.6m for aggregation for an iteration.
> 
> Thanks,
> Makoto

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Makoto Yui <yu...@gmail.com>.
Xiangrui and Debasish,

(2014/06/18 6:33), Debasish Das wrote:
> I did run pretty big sparse dataset (20M rows, 3M sparse features) and I
> got 100 iterations of SGD running in 200 seconds...10 executors each
> with 16 GB memory...

I could figure out what the problem is. "spark.akka.frameSize" was too 
large. By setting spark.akka.frameSize=10, it worked for the news20 dataset.

The execution was slow for more large KDD cup 2012, Track 2 dataset 
(235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to 
the sequential aggregation of dense vectors on a single driver node.

It took about 7.6m for aggregation for an iteration.

Thanks,
Makoto

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Debasish Das <de...@gmail.com>.
Xiangrui,

Could you point to the JIRA related to tree aggregate ? ...sounds like the
allreduce idea...

I would definitely like to try it on our dataset...

Makoto,

I did run pretty big sparse dataset (20M rows, 3M sparse features) and I
got 100 iterations of SGD running in 200 seconds...10 executors each with
16 GB memory...

Although the best result on the same dataset came out of liblinear and
BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
other heuristics...it was arnd 5% off...

Thanks.
Deb



On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai <db...@stanford.edu> wrote:

> Hi Xiangrui,
>
> Does it mean that mapPartition and then reduce shares the same
> behavior as aggregate operation which is O(n)?
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng <me...@gmail.com> wrote:
> > Hi DB,
> >
> > treeReduce (treeAggregate) is a feature I'm testing now. It is a
> > compromise between current reduce and butterfly allReduce. The former
> > runs in linear time on the number of partitions, the latter introduces
> > too many dependencies. treeAggregate with depth = 2 should run in
> > O(sqrt(n)) time, where n is the number of partitions. It would be
> > great if someone can help test its scalability.
> >
> > Best,
> > Xiangrui
> >
> > On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui <yu...@gmail.com> wrote:
> >> Hi Xiangrui,
> >>
> >>
> >> (2014/06/18 4:58), Xiangrui Meng wrote:
> >>>
> >>> How many partitions did you set? If there are too many partitions,
> >>> please do a coalesce before calling ML algorithms.
> >>
> >>
> >> The training data "news20.random.1000" is small and thus only 2
> partitions
> >> are used by the default.
> >>
> >> val training = MLUtils.loadLibSVMFile(sc,
> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
> >> multiclass=false).
> >>
> >> We also tried 32 partitions as follows but the aggregate never finishes.
> >>
> >> val training = MLUtils.loadLibSVMFile(sc,
> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
> >> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
> >>
> >>
> >>> Btw, could you try the tree branch in my repo?
> >>> https://github.com/mengxr/spark/tree/tree
> >>>
> >>> I used tree aggregate in this branch. It should help with the
> scalability.
> >>
> >>
> >> Is treeAggregate itself available on Spark 1.0?
> >>
> >> I wonder.. Could I test your modification just by running the following
> code
> >> on REPL?
> >>
> >> -------------------
> >> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
> i)
> >>         .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
> >>           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
> >> features)) =>
> >>             val l = gradient.compute(features, label, weights,
> >> Vectors.fromBreeze(grad))
> >>             (grad, loss + l)
> >>           },
> >>           combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
> >> (grad2, loss2)) =>
> >>             (grad1 += grad2, loss1 + loss2)
> >>           }, 2)
> >> -------------------------
> >>
> >> Rebuilding Spark is quite something to do evaluation.
> >>
> >> Thanks,
> >> Makoto
>

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by DB Tsai <db...@stanford.edu>.
Hi Xiangrui,

Does it mean that mapPartition and then reduce shares the same
behavior as aggregate operation which is O(n)?

Sincerely,

DB Tsai
-------------------------------------------------------
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng <me...@gmail.com> wrote:
> Hi DB,
>
> treeReduce (treeAggregate) is a feature I'm testing now. It is a
> compromise between current reduce and butterfly allReduce. The former
> runs in linear time on the number of partitions, the latter introduces
> too many dependencies. treeAggregate with depth = 2 should run in
> O(sqrt(n)) time, where n is the number of partitions. It would be
> great if someone can help test its scalability.
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui <yu...@gmail.com> wrote:
>> Hi Xiangrui,
>>
>>
>> (2014/06/18 4:58), Xiangrui Meng wrote:
>>>
>>> How many partitions did you set? If there are too many partitions,
>>> please do a coalesce before calling ML algorithms.
>>
>>
>> The training data "news20.random.1000" is small and thus only 2 partitions
>> are used by the default.
>>
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false).
>>
>> We also tried 32 partitions as follows but the aggregate never finishes.
>>
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>>
>>
>>> Btw, could you try the tree branch in my repo?
>>> https://github.com/mengxr/spark/tree/tree
>>>
>>> I used tree aggregate in this branch. It should help with the scalability.
>>
>>
>> Is treeAggregate itself available on Spark 1.0?
>>
>> I wonder.. Could I test your modification just by running the following code
>> on REPL?
>>
>> -------------------
>> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
>>         .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>>           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
>> features)) =>
>>             val l = gradient.compute(features, label, weights,
>> Vectors.fromBreeze(grad))
>>             (grad, loss + l)
>>           },
>>           combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
>> (grad2, loss2)) =>
>>             (grad1 += grad2, loss1 + loss2)
>>           }, 2)
>> -------------------------
>>
>> Rebuilding Spark is quite something to do evaluation.
>>
>> Thanks,
>> Makoto

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Xiangrui Meng <me...@gmail.com>.
Hi DB,

treeReduce (treeAggregate) is a feature I'm testing now. It is a
compromise between current reduce and butterfly allReduce. The former
runs in linear time on the number of partitions, the latter introduces
too many dependencies. treeAggregate with depth = 2 should run in
O(sqrt(n)) time, where n is the number of partitions. It would be
great if someone can help test its scalability.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui <yu...@gmail.com> wrote:
> Hi Xiangrui,
>
>
> (2014/06/18 4:58), Xiangrui Meng wrote:
>>
>> How many partitions did you set? If there are too many partitions,
>> please do a coalesce before calling ML algorithms.
>
>
> The training data "news20.random.1000" is small and thus only 2 partitions
> are used by the default.
>
> val training = MLUtils.loadLibSVMFile(sc,
> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
> multiclass=false).
>
> We also tried 32 partitions as follows but the aggregate never finishes.
>
> val training = MLUtils.loadLibSVMFile(sc,
> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>
>
>> Btw, could you try the tree branch in my repo?
>> https://github.com/mengxr/spark/tree/tree
>>
>> I used tree aggregate in this branch. It should help with the scalability.
>
>
> Is treeAggregate itself available on Spark 1.0?
>
> I wonder.. Could I test your modification just by running the following code
> on REPL?
>
> -------------------
> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
>         .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
> features)) =>
>             val l = gradient.compute(features, label, weights,
> Vectors.fromBreeze(grad))
>             (grad, loss + l)
>           },
>           combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
> (grad2, loss2)) =>
>             (grad1 += grad2, loss1 + loss2)
>           }, 2)
> -------------------------
>
> Rebuilding Spark is quite something to do evaluation.
>
> Thanks,
> Makoto

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Makoto Yui <yu...@gmail.com>.
Hi Xiangrui,

(2014/06/18 4:58), Xiangrui Meng wrote:
> How many partitions did you set? If there are too many partitions,
> please do a coalesce before calling ML algorithms.

The training data "news20.random.1000" is small and thus only 2 
partitions are used by the default.

val training = MLUtils.loadLibSVMFile(sc, 
"hdfs://host:8020/dataset/news20-binary/news20.random.1000", 
multiclass=false).

We also tried 32 partitions as follows but the aggregate never finishes.

val training = MLUtils.loadLibSVMFile(sc, 
"hdfs://host:8020/dataset/news20-binary/news20.random.1000", 
multiclass=false, numFeatures = 1354731 , minPartitions = 32)

> Btw, could you try the tree branch in my repo?
> https://github.com/mengxr/spark/tree/tree
>
> I used tree aggregate in this branch. It should help with the scalability.

Is treeAggregate itself available on Spark 1.0?

I wonder.. Could I test your modification just by running the following 
code on REPL?

-------------------
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
         .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, 
features)) =>
             val l = gradient.compute(features, label, weights, 
Vectors.fromBreeze(grad))
             (grad, loss + l)
           },
           combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), 
(grad2, loss2)) =>
             (grad1 += grad2, loss1 + loss2)
           }, 2)
-------------------------

Rebuilding Spark is quite something to do evaluation.

Thanks,
Makoto

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Xiangrui Meng <me...@gmail.com>.
Hi Makoto,

How many partitions did you set? If there are too many partitions,
please do a coalesce before calling ML algorithms.

Btw, could you try the tree branch in my repo?
https://github.com/mengxr/spark/tree/tree

I used tree aggregate in this branch. It should help with the scalability.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 12:22 PM, Makoto Yui <yu...@gmail.com> wrote:
> Here is follow-up to the previous evaluation.
>
> "aggregate at GradientDescent.scala:178" never finishes at
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178
>
> We confirmed, by -verbose:gc, that GC is not happening during the aggregate
> and the cumulative CPU time for the task is increasing little by little.
>
> LBFGS also does not work for large # of features (news20.random.1000)
> though it works fine for small # of features (news20.binary.1000).
>
> "aggregate at LBFGS.scala:201" also never finishes at
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201
>
> -----------------------------------------------------------------------
> [Evaluated code for LBFGS]
>
> import org.apache.spark.SparkContext
> import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
> import org.apache.spark.mllib.linalg.Vectors
> import org.apache.spark.mllib.util.MLUtils
> import org.apache.spark.mllib.classification.LogisticRegressionModel
> import org.apache.spark.mllib.optimization._
>
> val data = MLUtils.loadLibSVMFile(sc,
> "hdfs://dm01:8020/dataset/news20-binary/news20.random.1000",
> multiclass=false)
> val numFeatures = data.take(1)(0).features.size
>
> val training = data.map(x => (x.label, MLUtils.appendBias(x.features))).cache()
>
> // Run training algorithm to build the model
> val numCorrections = 10
> val convergenceTol = 1e-4
> val maxNumIterations = 20
> val regParam = 0.1
> val initialWeightsWithIntercept = Vectors.dense(new
> Array[Double](numFeatures + 1))
>
> val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
>   training,
>   new LogisticGradient(),
>   new SquaredL2Updater(),
>   numCorrections,
>   convergenceTol,
>   maxNumIterations,
>   regParam,
>   initialWeightsWithIntercept)
> -----------------------------------------------------------------------
>
>
> Thanks,
> Makoto
>
> 2014-06-17 21:32 GMT+09:00 Makoto Yui <yu...@gmail.com>:
>> Hello,
>>
>> I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
>> Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
>> the number of training examples used in the evaluation is just 1,000.
>>
>> It works fine for the dataset *news20.binary.1000* that has 178,560
>> features. However, it does not work for *news20.random.1000* where # of
>> features is large  (1,354,731 features) though we used a sparse vector
>> through MLUtils.loadLibSVMFile().
>>
>> The execution seems not progressing while no error is reported in the
>> spark-shell as well as in the stdout/stderr of executors.
>>
>> We used 32 executors with each allocating 7GB (2GB is for RDD) for
>> working memory.
>>
>> Any suggesions? Your help is really appreciated.
>>
>> ==============
>> Executed code
>> ==============
>> import org.apache.spark.mllib.util.MLUtils
>> import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
>>
>> //val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.binary.1000",
>> multiclass=false)
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false)
>>
>> val numFeatures = training .take(1)(0).features.size
>> //numFeatures: Int = 178560 for news20.binary.1000
>> //numFeatures: Int = 1354731 for news20.random.1000
>> val model = LogisticRegressionWithSGD.train(training, numIterations=1)
>>
>> ==================================
>> The dataset used in the evaluation
>> ==================================
>>
>> http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary
>>
>> $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' >
>> news20.binary.1000
>> $ sort -R news20.binary > news20.random
>> $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' >
>> news20.random.1000
>>
>> You can find the dataset in
>> https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
>> https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000
>>
>>
>> Thanks,
>> Makoto

Re: news20-binary classification with LogisticRegressionWithSGD

Posted by Makoto Yui <yu...@gmail.com>.
Here is follow-up to the previous evaluation.

"aggregate at GradientDescent.scala:178" never finishes at
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178

We confirmed, by -verbose:gc, that GC is not happening during the aggregate
and the cumulative CPU time for the task is increasing little by little.

LBFGS also does not work for large # of features (news20.random.1000)
though it works fine for small # of features (news20.binary.1000).

"aggregate at LBFGS.scala:201" also never finishes at
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201

-----------------------------------------------------------------------
[Evaluated code for LBFGS]

import org.apache.spark.SparkContext
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.optimization._

val data = MLUtils.loadLibSVMFile(sc,
"hdfs://dm01:8020/dataset/news20-binary/news20.random.1000",
multiclass=false)
val numFeatures = data.take(1)(0).features.size

val training = data.map(x => (x.label, MLUtils.appendBias(x.features))).cache()

// Run training algorithm to build the model
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new
Array[Double](numFeatures + 1))

val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
  training,
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept)
-----------------------------------------------------------------------


Thanks,
Makoto

2014-06-17 21:32 GMT+09:00 Makoto Yui <yu...@gmail.com>:
> Hello,
>
> I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
> Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
> the number of training examples used in the evaluation is just 1,000.
>
> It works fine for the dataset *news20.binary.1000* that has 178,560
> features. However, it does not work for *news20.random.1000* where # of
> features is large  (1,354,731 features) though we used a sparse vector
> through MLUtils.loadLibSVMFile().
>
> The execution seems not progressing while no error is reported in the
> spark-shell as well as in the stdout/stderr of executors.
>
> We used 32 executors with each allocating 7GB (2GB is for RDD) for
> working memory.
>
> Any suggesions? Your help is really appreciated.
>
> ==============
> Executed code
> ==============
> import org.apache.spark.mllib.util.MLUtils
> import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
>
> //val training = MLUtils.loadLibSVMFile(sc,
> "hdfs://host:8020/dataset/news20-binary/news20.binary.1000",
> multiclass=false)
> val training = MLUtils.loadLibSVMFile(sc,
> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
> multiclass=false)
>
> val numFeatures = training .take(1)(0).features.size
> //numFeatures: Int = 178560 for news20.binary.1000
> //numFeatures: Int = 1354731 for news20.random.1000
> val model = LogisticRegressionWithSGD.train(training, numIterations=1)
>
> ==================================
> The dataset used in the evaluation
> ==================================
>
> http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary
>
> $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' >
> news20.binary.1000
> $ sort -R news20.binary > news20.random
> $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' >
> news20.random.1000
>
> You can find the dataset in
> https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
> https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000
>
>
> Thanks,
> Makoto