You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jiusheng Chen <ch...@gmail.com> on 2014/09/04 04:18:06 UTC

Re: Is there any way to control the parallelism in LogisticRegression

Hi Xiangrui,

A side-by question about MLLib.
It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only support
L2 regurization, the doc explains it: "The L1 regularization by using
L1Updater
<http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater>
will not work since the soft-thresholding logic in L1Updater is designed
for gradient descent."

Since the algorithm comes from Breeze and I noticed Breeze actually
supports L1 (OWLQN
<http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering if
there is some special considerations that current MLLib didn't support
OWLQN? And any plan to add it?

Thanks for your time!



On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <do...@gmail.com> wrote:

> Update.
>
> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which
> sounds could control the locality. The default value is 0.1 (smaller value
> means lower locality). I change it to 1.0 (full locality) and use #3
> approach, then find a lot improvement (20%~40%). Although the Web UI still
> shows the type of task as 'ANY' and the input is from shuffle read, but the
> real performance is much better before change this parameter.
> [image: Inline image 1]
>
> I think the benefit includes:
>
> 1. This approach keep the physical partition size small, but make each
> task handle multiple partitions. So the memory requested for
> deserialization is reduced, which also reduce the GC time. That is exactly
> what we observed in our job.
>
> 2. This approach will not hit the 2G limitation, because it not change the
> partition size.
>
> And I also think that, Spark may change this default value, or at least
> expose this parameter to users (*CoalescedRDD *is a private class, and
> *RDD*.*coalesce* also don't have a parameter to control that).
>
> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <me...@gmail.com> wrote:
>
>> Sorry, I missed #2. My suggestion is the same as #2. You need to set a
>> bigger numPartitions to avoid hitting integer bound or 2G limitation,
>> at the cost of increased shuffle size per iteration. If you use a
>> CombineInputFormat and then cache, it will try to give you roughly the
>> same size per partition. There will be some remote fetches from HDFS
>> but still cheaper than calling RDD.repartition().
>>
>> For coalesce without shuffle, I don't know how to set the right number
>> of partitions either ...
>>
>> -Xiangrui
>>
>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <do...@gmail.com>
>> wrote:
>> > Hi Xiangrui,
>> >
>> > Thanks for your reply!
>> >
>> > Yes, our data is very sparse, but RDD.repartition invoke
>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I think it
>> has
>> > the same effect with #2, right?
>> >
>> > For CombineInputFormat, although I haven't tried it, but it sounds that
>> it
>> > will combine multiple partitions into a large partition if I cache it,
>> so
>> > same issues as #1?
>> >
>> > For coalesce, could you share some best practice how to set the right
>> number
>> > of partitions to avoid locality problem?
>> >
>> > Thanks!
>> >
>> >
>> >
>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <me...@gmail.com>
>> wrote:
>> >>
>> >> Assuming that your data is very sparse, I would recommend
>> >> RDD.repartition. But if it is not the case and you don't want to
>> >> shuffle the data, you can try a CombineInputFormat and then parse the
>> >> lines into labeled points. Coalesce may cause locality problems if you
>> >> didn't use the right number of partitions. -Xiangrui
>> >>
>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <do...@gmail.com>
>> >> wrote:
>> >> > I think this has the same effect and issue with #1, right?
>> >> >
>> >> >
>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <
>> chenjiusheng@gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> How about increase HDFS file extent size? like current value is
>> 128M,
>> >> >> we
>> >> >> make it 512M or bigger.
>> >> >>
>> >> >>
>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <dong128@gmail.com
>> >
>> >> >> wrote:
>> >> >>>
>> >> >>> Hi all,
>> >> >>>
>> >> >>> We are trying to use Spark MLlib to train super large data (100M
>> >> >>> features
>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By
>> default,
>> >> >>> MLlib
>> >> >>> will create a task for every partition at each iteration. But
>> because
>> >> >>> our
>> >> >>> dimensions are also very high, such large number of tasks will
>> >> >>> increase
>> >> >>> large network overhead to transfer the weight vector. So we want to
>> >> >>> reduce
>> >> >>> the number of tasks, we tried below ways:
>> >> >>>
>> >> >>> 1. Coalesce partitions without shuffling, then cache.
>> >> >>>
>> >> >>> data.coalesce(numPartitions).cache()
>> >> >>>
>> >> >>> This works fine for relative small data, but when data is
>> increasing
>> >> >>> and
>> >> >>> numPartitions is fixed, the size of one partition will be large.
>> This
>> >> >>> introduces two issues: the first is, the larger partition will need
>> >> >>> larger
>> >> >>> object and more memory at runtime, and trigger GC more frequently;
>> the
>> >> >>> second is, we meet the issue 'size exceeds integer.max_value'
>> error,
>> >> >>> which
>> >> >>> seems be caused by the size of one partition larger than 2G
>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>> >> >>>
>> >> >>> 2. Coalesce partitions with shuffling, then cache.
>> >> >>>
>> >> >>> data.coalesce(numPartitions, true).cache()
>> >> >>>
>> >> >>> It could mitigate the second issue in #1 at some degree, but fist
>> >> >>> issue
>> >> >>> is still there, and it also will introduce large amount of
>> shullfling.
>> >> >>>
>> >> >>> 3. Cache data first, and coalesce partitions.
>> >> >>>
>> >> >>> data.cache().coalesce(numPartitions)
>> >> >>>
>> >> >>> In this way, the number of cached partitions is not change, but
>> each
>> >> >>> task
>> >> >>> read the data from multiple partitions. However, I find the task
>> will
>> >> >>> loss
>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that means that
>> >> >>> tasks
>> >> >>> read data from other nodes, and become slower than that read data
>> from
>> >> >>> local
>> >> >>> memory.
>> >> >>>
>> >> >>> I think the best way should like #3, but leverage locality as more
>> as
>> >> >>> possible. Is there any way to do that? Any suggestions?
>> >> >>>
>> >> >>> Thanks!
>> >> >>>
>> >> >>> --
>> >> >>> ZHENG, Xu-dong
>> >> >>>
>> >> >>
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > 郑旭东
>> >> > ZHENG, Xu-dong
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > 郑旭东
>> > ZHENG, Xu-dong
>> >
>>
>
>
>
> --
> 郑旭东
> ZHENG, Xu-dong
>
>

Re: Is there any way to control the parallelism in LogisticRegression

Posted by DB Tsai <db...@dbtsai.com>.
Yes. But you need to store RDD as *serialized* Java objects. See the
session of storage level
http://spark.apache.org/docs/latest/programming-guide.html

Sincerely,

DB Tsai
-------------------------------------------------------
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Thu, Sep 4, 2014 at 8:06 PM, Jiusheng Chen <ch...@gmail.com>
wrote:

> Thanks DB.
> Did you mean this?
>
> spark.rdd.compress      true
>
>
> On Thu, Sep 4, 2014 at 2:48 PM, DB Tsai <db...@dbtsai.com> wrote:
>
>> For saving the memory, I recommend you compress the cached RDD, and it
>> will be couple times smaller than original data sets.
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> -------------------------------------------------------
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Wed, Sep 3, 2014 at 9:28 PM, Jiusheng Chen <ch...@gmail.com>
>> wrote:
>>
>>> Thanks DB and Xiangrui. Glad to know you guys are actively working on it.
>>>
>>> Another thing, did we evaluate the loss of using Float to store values?
>>> currently it is Double. Use fewer bits has the benifit of memory footprint
>>> reduction. According to Google, they even uses 16 bits (a special
>>> encoding scheme called q2.13)
>>> <http://jmlr.org/proceedings/papers/v28/golovin13.pdf> in their learner
>>> without measurable loss, and can save 75% memory.
>>>
>>>
>>> On Thu, Sep 4, 2014 at 11:02 AM, DB Tsai <db...@dbtsai.com> wrote:
>>>
>>>> With David's help today, we were able to implement elastic net glm in
>>>> Spark. It's surprising easy, and with just some modification in breeze's
>>>> OWLQN code, it just works without further investigation.
>>>>
>>>> We did benchmark, and the coefficients are within 0.5% differences
>>>> compared with R's glmnet package. I guess this is first truly distributed
>>>> glmnet implementation.
>>>>
>>>> Still require some effort to have it in mllib; mostly api cleanup work.
>>>>
>>>> 1) I'll submit a PR to breeze which implements weighted regularization
>>>> in OWLQN.
>>>> 2) This also depends on
>>>> https://issues.apache.org/jira/browse/SPARK-2505 which we have
>>>> internal version requiring some cleanup for open source project.
>>>>
>>>>
>>>> Sincerely,
>>>>
>>>> DB Tsai
>>>> -------------------------------------------------------
>>>> My Blog: https://www.dbtsai.com
>>>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>>>
>>>>
>>>> On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng <me...@gmail.com> wrote:
>>>>
>>>>> +DB & David (They implemented QWLQN on Spark today.)
>>>>> On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <ch...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Xiangrui,
>>>>>>
>>>>>> A side-by question about MLLib.
>>>>>> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only
>>>>>> support L2 regurization, the doc explains it: "The L1 regularization by
>>>>>> using L1Updater
>>>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater>
>>>>>> will not work since the soft-thresholding logic in L1Updater is designed
>>>>>> for gradient descent."
>>>>>>
>>>>>> Since the algorithm comes from Breeze and I noticed Breeze actually
>>>>>> supports L1 (OWLQN
>>>>>> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering
>>>>>> if there is some special considerations that current MLLib didn't support
>>>>>> OWLQN? And any plan to add it?
>>>>>>
>>>>>> Thanks for your time!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <do...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Update.
>>>>>>>
>>>>>>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*,
>>>>>>> which sounds could control the locality. The default value is 0.1 (smaller
>>>>>>> value means lower locality). I change it to 1.0 (full locality) and use #3
>>>>>>> approach, then find a lot improvement (20%~40%). Although the Web UI still
>>>>>>> shows the type of task as 'ANY' and the input is from shuffle read, but the
>>>>>>> real performance is much better before change this parameter.
>>>>>>> [image: Inline image 1]
>>>>>>>
>>>>>>> I think the benefit includes:
>>>>>>>
>>>>>>> 1. This approach keep the physical partition size small, but make
>>>>>>> each task handle multiple partitions. So the memory requested for
>>>>>>> deserialization is reduced, which also reduce the GC time. That is exactly
>>>>>>> what we observed in our job.
>>>>>>>
>>>>>>> 2. This approach will not hit the 2G limitation, because it not
>>>>>>> change the partition size.
>>>>>>>
>>>>>>> And I also think that, Spark may change this default value, or at
>>>>>>> least expose this parameter to users (*CoalescedRDD *is a private
>>>>>>> class, and *RDD*.*coalesce* also don't have a parameter to control
>>>>>>> that).
>>>>>>>
>>>>>>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <me...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Sorry, I missed #2. My suggestion is the same as #2. You need to
>>>>>>>> set a
>>>>>>>> bigger numPartitions to avoid hitting integer bound or 2G
>>>>>>>> limitation,
>>>>>>>> at the cost of increased shuffle size per iteration. If you use a
>>>>>>>> CombineInputFormat and then cache, it will try to give you roughly
>>>>>>>> the
>>>>>>>> same size per partition. There will be some remote fetches from HDFS
>>>>>>>> but still cheaper than calling RDD.repartition().
>>>>>>>>
>>>>>>>> For coalesce without shuffle, I don't know how to set the right
>>>>>>>> number
>>>>>>>> of partitions either ...
>>>>>>>>
>>>>>>>> -Xiangrui
>>>>>>>>
>>>>>>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <do...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> > Hi Xiangrui,
>>>>>>>> >
>>>>>>>> > Thanks for your reply!
>>>>>>>> >
>>>>>>>> > Yes, our data is very sparse, but RDD.repartition invoke
>>>>>>>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I
>>>>>>>> think it has
>>>>>>>> > the same effect with #2, right?
>>>>>>>> >
>>>>>>>> > For CombineInputFormat, although I haven't tried it, but it
>>>>>>>> sounds that it
>>>>>>>> > will combine multiple partitions into a large partition if I
>>>>>>>> cache it, so
>>>>>>>> > same issues as #1?
>>>>>>>> >
>>>>>>>> > For coalesce, could you share some best practice how to set the
>>>>>>>> right number
>>>>>>>> > of partitions to avoid locality problem?
>>>>>>>> >
>>>>>>>> > Thanks!
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <me...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> >>
>>>>>>>> >> Assuming that your data is very sparse, I would recommend
>>>>>>>> >> RDD.repartition. But if it is not the case and you don't want to
>>>>>>>> >> shuffle the data, you can try a CombineInputFormat and then
>>>>>>>> parse the
>>>>>>>> >> lines into labeled points. Coalesce may cause locality problems
>>>>>>>> if you
>>>>>>>> >> didn't use the right number of partitions. -Xiangrui
>>>>>>>> >>
>>>>>>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <
>>>>>>>> dong128@gmail.com>
>>>>>>>> >> wrote:
>>>>>>>> >> > I think this has the same effect and issue with #1, right?
>>>>>>>> >> >
>>>>>>>> >> >
>>>>>>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <
>>>>>>>> chenjiusheng@gmail.com>
>>>>>>>> >> > wrote:
>>>>>>>> >> >>
>>>>>>>> >> >> How about increase HDFS file extent size? like current value
>>>>>>>> is 128M,
>>>>>>>> >> >> we
>>>>>>>> >> >> make it 512M or bigger.
>>>>>>>> >> >>
>>>>>>>> >> >>
>>>>>>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <
>>>>>>>> dong128@gmail.com>
>>>>>>>> >> >> wrote:
>>>>>>>> >> >>>
>>>>>>>> >> >>> Hi all,
>>>>>>>> >> >>>
>>>>>>>> >> >>> We are trying to use Spark MLlib to train super large data
>>>>>>>> (100M
>>>>>>>> >> >>> features
>>>>>>>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By
>>>>>>>> default,
>>>>>>>> >> >>> MLlib
>>>>>>>> >> >>> will create a task for every partition at each iteration.
>>>>>>>> But because
>>>>>>>> >> >>> our
>>>>>>>> >> >>> dimensions are also very high, such large number of tasks
>>>>>>>> will
>>>>>>>> >> >>> increase
>>>>>>>> >> >>> large network overhead to transfer the weight vector. So we
>>>>>>>> want to
>>>>>>>> >> >>> reduce
>>>>>>>> >> >>> the number of tasks, we tried below ways:
>>>>>>>> >> >>>
>>>>>>>> >> >>> 1. Coalesce partitions without shuffling, then cache.
>>>>>>>> >> >>>
>>>>>>>> >> >>> data.coalesce(numPartitions).cache()
>>>>>>>> >> >>>
>>>>>>>> >> >>> This works fine for relative small data, but when data is
>>>>>>>> increasing
>>>>>>>> >> >>> and
>>>>>>>> >> >>> numPartitions is fixed, the size of one partition will be
>>>>>>>> large. This
>>>>>>>> >> >>> introduces two issues: the first is, the larger partition
>>>>>>>> will need
>>>>>>>> >> >>> larger
>>>>>>>> >> >>> object and more memory at runtime, and trigger GC more
>>>>>>>> frequently; the
>>>>>>>> >> >>> second is, we meet the issue 'size exceeds
>>>>>>>> integer.max_value' error,
>>>>>>>> >> >>> which
>>>>>>>> >> >>> seems be caused by the size of one partition larger than 2G
>>>>>>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>>>>>>>> >> >>>
>>>>>>>> >> >>> 2. Coalesce partitions with shuffling, then cache.
>>>>>>>> >> >>>
>>>>>>>> >> >>> data.coalesce(numPartitions, true).cache()
>>>>>>>> >> >>>
>>>>>>>> >> >>> It could mitigate the second issue in #1 at some degree, but
>>>>>>>> fist
>>>>>>>> >> >>> issue
>>>>>>>> >> >>> is still there, and it also will introduce large amount of
>>>>>>>> shullfling.
>>>>>>>> >> >>>
>>>>>>>> >> >>> 3. Cache data first, and coalesce partitions.
>>>>>>>> >> >>>
>>>>>>>> >> >>> data.cache().coalesce(numPartitions)
>>>>>>>> >> >>>
>>>>>>>> >> >>> In this way, the number of cached partitions is not change,
>>>>>>>> but each
>>>>>>>> >> >>> task
>>>>>>>> >> >>> read the data from multiple partitions. However, I find the
>>>>>>>> task will
>>>>>>>> >> >>> loss
>>>>>>>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that
>>>>>>>> means that
>>>>>>>> >> >>> tasks
>>>>>>>> >> >>> read data from other nodes, and become slower than that read
>>>>>>>> data from
>>>>>>>> >> >>> local
>>>>>>>> >> >>> memory.
>>>>>>>> >> >>>
>>>>>>>> >> >>> I think the best way should like #3, but leverage locality
>>>>>>>> as more as
>>>>>>>> >> >>> possible. Is there any way to do that? Any suggestions?
>>>>>>>> >> >>>
>>>>>>>> >> >>> Thanks!
>>>>>>>> >> >>>
>>>>>>>> >> >>> --
>>>>>>>> >> >>> ZHENG, Xu-dong
>>>>>>>> >> >>>
>>>>>>>> >> >>
>>>>>>>> >> >
>>>>>>>> >> >
>>>>>>>> >> >
>>>>>>>> >> > --
>>>>>>>> >> > 郑旭东
>>>>>>>> >> > ZHENG, Xu-dong
>>>>>>>> >> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > --
>>>>>>>> > 郑旭东
>>>>>>>> > ZHENG, Xu-dong
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> 郑旭东
>>>>>>> ZHENG, Xu-dong
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Re: Is there any way to control the parallelism in LogisticRegression

Posted by Jiusheng Chen <ch...@gmail.com>.
Thanks DB.
Did you mean this?

spark.rdd.compress      true


On Thu, Sep 4, 2014 at 2:48 PM, DB Tsai <db...@dbtsai.com> wrote:

> For saving the memory, I recommend you compress the cached RDD, and it
> will be couple times smaller than original data sets.
>
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Wed, Sep 3, 2014 at 9:28 PM, Jiusheng Chen <ch...@gmail.com>
> wrote:
>
>> Thanks DB and Xiangrui. Glad to know you guys are actively working on it.
>>
>> Another thing, did we evaluate the loss of using Float to store values?
>> currently it is Double. Use fewer bits has the benifit of memory footprint
>> reduction. According to Google, they even uses 16 bits (a special
>> encoding scheme called q2.13)
>> <http://jmlr.org/proceedings/papers/v28/golovin13.pdf> in their learner
>> without measurable loss, and can save 75% memory.
>>
>>
>> On Thu, Sep 4, 2014 at 11:02 AM, DB Tsai <db...@dbtsai.com> wrote:
>>
>>> With David's help today, we were able to implement elastic net glm in
>>> Spark. It's surprising easy, and with just some modification in breeze's
>>> OWLQN code, it just works without further investigation.
>>>
>>> We did benchmark, and the coefficients are within 0.5% differences
>>> compared with R's glmnet package. I guess this is first truly distributed
>>> glmnet implementation.
>>>
>>> Still require some effort to have it in mllib; mostly api cleanup work.
>>>
>>> 1) I'll submit a PR to breeze which implements weighted regularization
>>> in OWLQN.
>>> 2) This also depends on https://issues.apache.org/jira/browse/SPARK-2505
>>> which we have internal version requiring some cleanup for open source
>>> project.
>>>
>>>
>>> Sincerely,
>>>
>>> DB Tsai
>>> -------------------------------------------------------
>>> My Blog: https://www.dbtsai.com
>>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>>
>>>
>>> On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng <me...@gmail.com> wrote:
>>>
>>>> +DB & David (They implemented QWLQN on Spark today.)
>>>> On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <ch...@gmail.com> wrote:
>>>>
>>>>> Hi Xiangrui,
>>>>>
>>>>> A side-by question about MLLib.
>>>>> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only
>>>>> support L2 regurization, the doc explains it: "The L1 regularization by
>>>>> using L1Updater
>>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater>
>>>>> will not work since the soft-thresholding logic in L1Updater is designed
>>>>> for gradient descent."
>>>>>
>>>>> Since the algorithm comes from Breeze and I noticed Breeze actually
>>>>> supports L1 (OWLQN
>>>>> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering
>>>>> if there is some special considerations that current MLLib didn't support
>>>>> OWLQN? And any plan to add it?
>>>>>
>>>>> Thanks for your time!
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <do...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Update.
>>>>>>
>>>>>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which
>>>>>> sounds could control the locality. The default value is 0.1 (smaller value
>>>>>> means lower locality). I change it to 1.0 (full locality) and use #3
>>>>>> approach, then find a lot improvement (20%~40%). Although the Web UI still
>>>>>> shows the type of task as 'ANY' and the input is from shuffle read, but the
>>>>>> real performance is much better before change this parameter.
>>>>>> [image: Inline image 1]
>>>>>>
>>>>>> I think the benefit includes:
>>>>>>
>>>>>> 1. This approach keep the physical partition size small, but make
>>>>>> each task handle multiple partitions. So the memory requested for
>>>>>> deserialization is reduced, which also reduce the GC time. That is exactly
>>>>>> what we observed in our job.
>>>>>>
>>>>>> 2. This approach will not hit the 2G limitation, because it not
>>>>>> change the partition size.
>>>>>>
>>>>>> And I also think that, Spark may change this default value, or at
>>>>>> least expose this parameter to users (*CoalescedRDD *is a private
>>>>>> class, and *RDD*.*coalesce* also don't have a parameter to control
>>>>>> that).
>>>>>>
>>>>>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <me...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Sorry, I missed #2. My suggestion is the same as #2. You need to set
>>>>>>> a
>>>>>>> bigger numPartitions to avoid hitting integer bound or 2G limitation,
>>>>>>> at the cost of increased shuffle size per iteration. If you use a
>>>>>>> CombineInputFormat and then cache, it will try to give you roughly
>>>>>>> the
>>>>>>> same size per partition. There will be some remote fetches from HDFS
>>>>>>> but still cheaper than calling RDD.repartition().
>>>>>>>
>>>>>>> For coalesce without shuffle, I don't know how to set the right
>>>>>>> number
>>>>>>> of partitions either ...
>>>>>>>
>>>>>>> -Xiangrui
>>>>>>>
>>>>>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <do...@gmail.com>
>>>>>>> wrote:
>>>>>>> > Hi Xiangrui,
>>>>>>> >
>>>>>>> > Thanks for your reply!
>>>>>>> >
>>>>>>> > Yes, our data is very sparse, but RDD.repartition invoke
>>>>>>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I think
>>>>>>> it has
>>>>>>> > the same effect with #2, right?
>>>>>>> >
>>>>>>> > For CombineInputFormat, although I haven't tried it, but it sounds
>>>>>>> that it
>>>>>>> > will combine multiple partitions into a large partition if I cache
>>>>>>> it, so
>>>>>>> > same issues as #1?
>>>>>>> >
>>>>>>> > For coalesce, could you share some best practice how to set the
>>>>>>> right number
>>>>>>> > of partitions to avoid locality problem?
>>>>>>> >
>>>>>>> > Thanks!
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <me...@gmail.com>
>>>>>>> wrote:
>>>>>>> >>
>>>>>>> >> Assuming that your data is very sparse, I would recommend
>>>>>>> >> RDD.repartition. But if it is not the case and you don't want to
>>>>>>> >> shuffle the data, you can try a CombineInputFormat and then parse
>>>>>>> the
>>>>>>> >> lines into labeled points. Coalesce may cause locality problems
>>>>>>> if you
>>>>>>> >> didn't use the right number of partitions. -Xiangrui
>>>>>>> >>
>>>>>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <
>>>>>>> dong128@gmail.com>
>>>>>>> >> wrote:
>>>>>>> >> > I think this has the same effect and issue with #1, right?
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <
>>>>>>> chenjiusheng@gmail.com>
>>>>>>> >> > wrote:
>>>>>>> >> >>
>>>>>>> >> >> How about increase HDFS file extent size? like current value
>>>>>>> is 128M,
>>>>>>> >> >> we
>>>>>>> >> >> make it 512M or bigger.
>>>>>>> >> >>
>>>>>>> >> >>
>>>>>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <
>>>>>>> dong128@gmail.com>
>>>>>>> >> >> wrote:
>>>>>>> >> >>>
>>>>>>> >> >>> Hi all,
>>>>>>> >> >>>
>>>>>>> >> >>> We are trying to use Spark MLlib to train super large data
>>>>>>> (100M
>>>>>>> >> >>> features
>>>>>>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By
>>>>>>> default,
>>>>>>> >> >>> MLlib
>>>>>>> >> >>> will create a task for every partition at each iteration. But
>>>>>>> because
>>>>>>> >> >>> our
>>>>>>> >> >>> dimensions are also very high, such large number of tasks will
>>>>>>> >> >>> increase
>>>>>>> >> >>> large network overhead to transfer the weight vector. So we
>>>>>>> want to
>>>>>>> >> >>> reduce
>>>>>>> >> >>> the number of tasks, we tried below ways:
>>>>>>> >> >>>
>>>>>>> >> >>> 1. Coalesce partitions without shuffling, then cache.
>>>>>>> >> >>>
>>>>>>> >> >>> data.coalesce(numPartitions).cache()
>>>>>>> >> >>>
>>>>>>> >> >>> This works fine for relative small data, but when data is
>>>>>>> increasing
>>>>>>> >> >>> and
>>>>>>> >> >>> numPartitions is fixed, the size of one partition will be
>>>>>>> large. This
>>>>>>> >> >>> introduces two issues: the first is, the larger partition
>>>>>>> will need
>>>>>>> >> >>> larger
>>>>>>> >> >>> object and more memory at runtime, and trigger GC more
>>>>>>> frequently; the
>>>>>>> >> >>> second is, we meet the issue 'size exceeds integer.max_value'
>>>>>>> error,
>>>>>>> >> >>> which
>>>>>>> >> >>> seems be caused by the size of one partition larger than 2G
>>>>>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>>>>>>> >> >>>
>>>>>>> >> >>> 2. Coalesce partitions with shuffling, then cache.
>>>>>>> >> >>>
>>>>>>> >> >>> data.coalesce(numPartitions, true).cache()
>>>>>>> >> >>>
>>>>>>> >> >>> It could mitigate the second issue in #1 at some degree, but
>>>>>>> fist
>>>>>>> >> >>> issue
>>>>>>> >> >>> is still there, and it also will introduce large amount of
>>>>>>> shullfling.
>>>>>>> >> >>>
>>>>>>> >> >>> 3. Cache data first, and coalesce partitions.
>>>>>>> >> >>>
>>>>>>> >> >>> data.cache().coalesce(numPartitions)
>>>>>>> >> >>>
>>>>>>> >> >>> In this way, the number of cached partitions is not change,
>>>>>>> but each
>>>>>>> >> >>> task
>>>>>>> >> >>> read the data from multiple partitions. However, I find the
>>>>>>> task will
>>>>>>> >> >>> loss
>>>>>>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that means
>>>>>>> that
>>>>>>> >> >>> tasks
>>>>>>> >> >>> read data from other nodes, and become slower than that read
>>>>>>> data from
>>>>>>> >> >>> local
>>>>>>> >> >>> memory.
>>>>>>> >> >>>
>>>>>>> >> >>> I think the best way should like #3, but leverage locality as
>>>>>>> more as
>>>>>>> >> >>> possible. Is there any way to do that? Any suggestions?
>>>>>>> >> >>>
>>>>>>> >> >>> Thanks!
>>>>>>> >> >>>
>>>>>>> >> >>> --
>>>>>>> >> >>> ZHENG, Xu-dong
>>>>>>> >> >>>
>>>>>>> >> >>
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > --
>>>>>>> >> > 郑旭东
>>>>>>> >> > ZHENG, Xu-dong
>>>>>>> >> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > --
>>>>>>> > 郑旭东
>>>>>>> > ZHENG, Xu-dong
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> 郑旭东
>>>>>> ZHENG, Xu-dong
>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Re: Is there any way to control the parallelism in LogisticRegression

Posted by DB Tsai <db...@dbtsai.com>.
For saving the memory, I recommend you compress the cached RDD, and it will
be couple times smaller than original data sets.


Sincerely,

DB Tsai
-------------------------------------------------------
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, Sep 3, 2014 at 9:28 PM, Jiusheng Chen <ch...@gmail.com>
wrote:

> Thanks DB and Xiangrui. Glad to know you guys are actively working on it.
>
> Another thing, did we evaluate the loss of using Float to store values?
> currently it is Double. Use fewer bits has the benifit of memory footprint
> reduction. According to Google, they even uses 16 bits (a special
> encoding scheme called q2.13)
> <http://jmlr.org/proceedings/papers/v28/golovin13.pdf> in their learner
> without measurable loss, and can save 75% memory.
>
>
> On Thu, Sep 4, 2014 at 11:02 AM, DB Tsai <db...@dbtsai.com> wrote:
>
>> With David's help today, we were able to implement elastic net glm in
>> Spark. It's surprising easy, and with just some modification in breeze's
>> OWLQN code, it just works without further investigation.
>>
>> We did benchmark, and the coefficients are within 0.5% differences
>> compared with R's glmnet package. I guess this is first truly distributed
>> glmnet implementation.
>>
>> Still require some effort to have it in mllib; mostly api cleanup work.
>>
>> 1) I'll submit a PR to breeze which implements weighted regularization in
>> OWLQN.
>> 2) This also depends on https://issues.apache.org/jira/browse/SPARK-2505
>> which we have internal version requiring some cleanup for open source
>> project.
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> -------------------------------------------------------
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng <me...@gmail.com> wrote:
>>
>>> +DB & David (They implemented QWLQN on Spark today.)
>>> On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <ch...@gmail.com> wrote:
>>>
>>>> Hi Xiangrui,
>>>>
>>>> A side-by question about MLLib.
>>>> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only
>>>> support L2 regurization, the doc explains it: "The L1 regularization by
>>>> using L1Updater
>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater>
>>>> will not work since the soft-thresholding logic in L1Updater is designed
>>>> for gradient descent."
>>>>
>>>> Since the algorithm comes from Breeze and I noticed Breeze actually
>>>> supports L1 (OWLQN
>>>> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering
>>>> if there is some special considerations that current MLLib didn't support
>>>> OWLQN? And any plan to add it?
>>>>
>>>> Thanks for your time!
>>>>
>>>>
>>>>
>>>> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <do...@gmail.com>
>>>> wrote:
>>>>
>>>>> Update.
>>>>>
>>>>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which
>>>>> sounds could control the locality. The default value is 0.1 (smaller value
>>>>> means lower locality). I change it to 1.0 (full locality) and use #3
>>>>> approach, then find a lot improvement (20%~40%). Although the Web UI still
>>>>> shows the type of task as 'ANY' and the input is from shuffle read, but the
>>>>> real performance is much better before change this parameter.
>>>>> [image: Inline image 1]
>>>>>
>>>>> I think the benefit includes:
>>>>>
>>>>> 1. This approach keep the physical partition size small, but make each
>>>>> task handle multiple partitions. So the memory requested for
>>>>> deserialization is reduced, which also reduce the GC time. That is exactly
>>>>> what we observed in our job.
>>>>>
>>>>> 2. This approach will not hit the 2G limitation, because it not change
>>>>> the partition size.
>>>>>
>>>>> And I also think that, Spark may change this default value, or at
>>>>> least expose this parameter to users (*CoalescedRDD *is a private
>>>>> class, and *RDD*.*coalesce* also don't have a parameter to control
>>>>> that).
>>>>>
>>>>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <me...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sorry, I missed #2. My suggestion is the same as #2. You need to set a
>>>>>> bigger numPartitions to avoid hitting integer bound or 2G limitation,
>>>>>> at the cost of increased shuffle size per iteration. If you use a
>>>>>> CombineInputFormat and then cache, it will try to give you roughly the
>>>>>> same size per partition. There will be some remote fetches from HDFS
>>>>>> but still cheaper than calling RDD.repartition().
>>>>>>
>>>>>> For coalesce without shuffle, I don't know how to set the right number
>>>>>> of partitions either ...
>>>>>>
>>>>>> -Xiangrui
>>>>>>
>>>>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <do...@gmail.com>
>>>>>> wrote:
>>>>>> > Hi Xiangrui,
>>>>>> >
>>>>>> > Thanks for your reply!
>>>>>> >
>>>>>> > Yes, our data is very sparse, but RDD.repartition invoke
>>>>>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I think
>>>>>> it has
>>>>>> > the same effect with #2, right?
>>>>>> >
>>>>>> > For CombineInputFormat, although I haven't tried it, but it sounds
>>>>>> that it
>>>>>> > will combine multiple partitions into a large partition if I cache
>>>>>> it, so
>>>>>> > same issues as #1?
>>>>>> >
>>>>>> > For coalesce, could you share some best practice how to set the
>>>>>> right number
>>>>>> > of partitions to avoid locality problem?
>>>>>> >
>>>>>> > Thanks!
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <me...@gmail.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> Assuming that your data is very sparse, I would recommend
>>>>>> >> RDD.repartition. But if it is not the case and you don't want to
>>>>>> >> shuffle the data, you can try a CombineInputFormat and then parse
>>>>>> the
>>>>>> >> lines into labeled points. Coalesce may cause locality problems if
>>>>>> you
>>>>>> >> didn't use the right number of partitions. -Xiangrui
>>>>>> >>
>>>>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <
>>>>>> dong128@gmail.com>
>>>>>> >> wrote:
>>>>>> >> > I think this has the same effect and issue with #1, right?
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <
>>>>>> chenjiusheng@gmail.com>
>>>>>> >> > wrote:
>>>>>> >> >>
>>>>>> >> >> How about increase HDFS file extent size? like current value is
>>>>>> 128M,
>>>>>> >> >> we
>>>>>> >> >> make it 512M or bigger.
>>>>>> >> >>
>>>>>> >> >>
>>>>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <
>>>>>> dong128@gmail.com>
>>>>>> >> >> wrote:
>>>>>> >> >>>
>>>>>> >> >>> Hi all,
>>>>>> >> >>>
>>>>>> >> >>> We are trying to use Spark MLlib to train super large data
>>>>>> (100M
>>>>>> >> >>> features
>>>>>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By
>>>>>> default,
>>>>>> >> >>> MLlib
>>>>>> >> >>> will create a task for every partition at each iteration. But
>>>>>> because
>>>>>> >> >>> our
>>>>>> >> >>> dimensions are also very high, such large number of tasks will
>>>>>> >> >>> increase
>>>>>> >> >>> large network overhead to transfer the weight vector. So we
>>>>>> want to
>>>>>> >> >>> reduce
>>>>>> >> >>> the number of tasks, we tried below ways:
>>>>>> >> >>>
>>>>>> >> >>> 1. Coalesce partitions without shuffling, then cache.
>>>>>> >> >>>
>>>>>> >> >>> data.coalesce(numPartitions).cache()
>>>>>> >> >>>
>>>>>> >> >>> This works fine for relative small data, but when data is
>>>>>> increasing
>>>>>> >> >>> and
>>>>>> >> >>> numPartitions is fixed, the size of one partition will be
>>>>>> large. This
>>>>>> >> >>> introduces two issues: the first is, the larger partition will
>>>>>> need
>>>>>> >> >>> larger
>>>>>> >> >>> object and more memory at runtime, and trigger GC more
>>>>>> frequently; the
>>>>>> >> >>> second is, we meet the issue 'size exceeds integer.max_value'
>>>>>> error,
>>>>>> >> >>> which
>>>>>> >> >>> seems be caused by the size of one partition larger than 2G
>>>>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>>>>>> >> >>>
>>>>>> >> >>> 2. Coalesce partitions with shuffling, then cache.
>>>>>> >> >>>
>>>>>> >> >>> data.coalesce(numPartitions, true).cache()
>>>>>> >> >>>
>>>>>> >> >>> It could mitigate the second issue in #1 at some degree, but
>>>>>> fist
>>>>>> >> >>> issue
>>>>>> >> >>> is still there, and it also will introduce large amount of
>>>>>> shullfling.
>>>>>> >> >>>
>>>>>> >> >>> 3. Cache data first, and coalesce partitions.
>>>>>> >> >>>
>>>>>> >> >>> data.cache().coalesce(numPartitions)
>>>>>> >> >>>
>>>>>> >> >>> In this way, the number of cached partitions is not change,
>>>>>> but each
>>>>>> >> >>> task
>>>>>> >> >>> read the data from multiple partitions. However, I find the
>>>>>> task will
>>>>>> >> >>> loss
>>>>>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that means
>>>>>> that
>>>>>> >> >>> tasks
>>>>>> >> >>> read data from other nodes, and become slower than that read
>>>>>> data from
>>>>>> >> >>> local
>>>>>> >> >>> memory.
>>>>>> >> >>>
>>>>>> >> >>> I think the best way should like #3, but leverage locality as
>>>>>> more as
>>>>>> >> >>> possible. Is there any way to do that? Any suggestions?
>>>>>> >> >>>
>>>>>> >> >>> Thanks!
>>>>>> >> >>>
>>>>>> >> >>> --
>>>>>> >> >>> ZHENG, Xu-dong
>>>>>> >> >>>
>>>>>> >> >>
>>>>>> >> >
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > --
>>>>>> >> > 郑旭东
>>>>>> >> > ZHENG, Xu-dong
>>>>>> >> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> > 郑旭东
>>>>>> > ZHENG, Xu-dong
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> 郑旭东
>>>>> ZHENG, Xu-dong
>>>>>
>>>>>
>>>>
>>
>

Re: Is there any way to control the parallelism in LogisticRegression

Posted by Jiusheng Chen <ch...@gmail.com>.
Thanks DB and Xiangrui. Glad to know you guys are actively working on it.

Another thing, did we evaluate the loss of using Float to store values?
currently it is Double. Use fewer bits has the benifit of memory footprint
reduction. According to Google, they even uses 16 bits (a special encoding
scheme called q2.13) <http://jmlr.org/proceedings/papers/v28/golovin13.pdf>
in their learner without measurable loss, and can save 75% memory.


On Thu, Sep 4, 2014 at 11:02 AM, DB Tsai <db...@dbtsai.com> wrote:

> With David's help today, we were able to implement elastic net glm in
> Spark. It's surprising easy, and with just some modification in breeze's
> OWLQN code, it just works without further investigation.
>
> We did benchmark, and the coefficients are within 0.5% differences
> compared with R's glmnet package. I guess this is first truly distributed
> glmnet implementation.
>
> Still require some effort to have it in mllib; mostly api cleanup work.
>
> 1) I'll submit a PR to breeze which implements weighted regularization in
> OWLQN.
> 2) This also depends on https://issues.apache.org/jira/browse/SPARK-2505
> which we have internal version requiring some cleanup for open source
> project.
>
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng <me...@gmail.com> wrote:
>
>> +DB & David (They implemented QWLQN on Spark today.)
>> On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <ch...@gmail.com> wrote:
>>
>>> Hi Xiangrui,
>>>
>>> A side-by question about MLLib.
>>> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only
>>> support L2 regurization, the doc explains it: "The L1 regularization by
>>> using L1Updater
>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater>
>>> will not work since the soft-thresholding logic in L1Updater is designed
>>> for gradient descent."
>>>
>>> Since the algorithm comes from Breeze and I noticed Breeze actually
>>> supports L1 (OWLQN
>>> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering
>>> if there is some special considerations that current MLLib didn't support
>>> OWLQN? And any plan to add it?
>>>
>>> Thanks for your time!
>>>
>>>
>>>
>>> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <do...@gmail.com>
>>> wrote:
>>>
>>>> Update.
>>>>
>>>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which
>>>> sounds could control the locality. The default value is 0.1 (smaller value
>>>> means lower locality). I change it to 1.0 (full locality) and use #3
>>>> approach, then find a lot improvement (20%~40%). Although the Web UI still
>>>> shows the type of task as 'ANY' and the input is from shuffle read, but the
>>>> real performance is much better before change this parameter.
>>>> [image: Inline image 1]
>>>>
>>>> I think the benefit includes:
>>>>
>>>> 1. This approach keep the physical partition size small, but make each
>>>> task handle multiple partitions. So the memory requested for
>>>> deserialization is reduced, which also reduce the GC time. That is exactly
>>>> what we observed in our job.
>>>>
>>>> 2. This approach will not hit the 2G limitation, because it not change
>>>> the partition size.
>>>>
>>>> And I also think that, Spark may change this default value, or at least
>>>> expose this parameter to users (*CoalescedRDD *is a private class, and
>>>> *RDD*.*coalesce* also don't have a parameter to control that).
>>>>
>>>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <me...@gmail.com>
>>>> wrote:
>>>>
>>>>> Sorry, I missed #2. My suggestion is the same as #2. You need to set a
>>>>> bigger numPartitions to avoid hitting integer bound or 2G limitation,
>>>>> at the cost of increased shuffle size per iteration. If you use a
>>>>> CombineInputFormat and then cache, it will try to give you roughly the
>>>>> same size per partition. There will be some remote fetches from HDFS
>>>>> but still cheaper than calling RDD.repartition().
>>>>>
>>>>> For coalesce without shuffle, I don't know how to set the right number
>>>>> of partitions either ...
>>>>>
>>>>> -Xiangrui
>>>>>
>>>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <do...@gmail.com>
>>>>> wrote:
>>>>> > Hi Xiangrui,
>>>>> >
>>>>> > Thanks for your reply!
>>>>> >
>>>>> > Yes, our data is very sparse, but RDD.repartition invoke
>>>>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I think
>>>>> it has
>>>>> > the same effect with #2, right?
>>>>> >
>>>>> > For CombineInputFormat, although I haven't tried it, but it sounds
>>>>> that it
>>>>> > will combine multiple partitions into a large partition if I cache
>>>>> it, so
>>>>> > same issues as #1?
>>>>> >
>>>>> > For coalesce, could you share some best practice how to set the
>>>>> right number
>>>>> > of partitions to avoid locality problem?
>>>>> >
>>>>> > Thanks!
>>>>> >
>>>>> >
>>>>> >
>>>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <me...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >> Assuming that your data is very sparse, I would recommend
>>>>> >> RDD.repartition. But if it is not the case and you don't want to
>>>>> >> shuffle the data, you can try a CombineInputFormat and then parse
>>>>> the
>>>>> >> lines into labeled points. Coalesce may cause locality problems if
>>>>> you
>>>>> >> didn't use the right number of partitions. -Xiangrui
>>>>> >>
>>>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <dong128@gmail.com
>>>>> >
>>>>> >> wrote:
>>>>> >> > I think this has the same effect and issue with #1, right?
>>>>> >> >
>>>>> >> >
>>>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <
>>>>> chenjiusheng@gmail.com>
>>>>> >> > wrote:
>>>>> >> >>
>>>>> >> >> How about increase HDFS file extent size? like current value is
>>>>> 128M,
>>>>> >> >> we
>>>>> >> >> make it 512M or bigger.
>>>>> >> >>
>>>>> >> >>
>>>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <
>>>>> dong128@gmail.com>
>>>>> >> >> wrote:
>>>>> >> >>>
>>>>> >> >>> Hi all,
>>>>> >> >>>
>>>>> >> >>> We are trying to use Spark MLlib to train super large data (100M
>>>>> >> >>> features
>>>>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By
>>>>> default,
>>>>> >> >>> MLlib
>>>>> >> >>> will create a task for every partition at each iteration. But
>>>>> because
>>>>> >> >>> our
>>>>> >> >>> dimensions are also very high, such large number of tasks will
>>>>> >> >>> increase
>>>>> >> >>> large network overhead to transfer the weight vector. So we
>>>>> want to
>>>>> >> >>> reduce
>>>>> >> >>> the number of tasks, we tried below ways:
>>>>> >> >>>
>>>>> >> >>> 1. Coalesce partitions without shuffling, then cache.
>>>>> >> >>>
>>>>> >> >>> data.coalesce(numPartitions).cache()
>>>>> >> >>>
>>>>> >> >>> This works fine for relative small data, but when data is
>>>>> increasing
>>>>> >> >>> and
>>>>> >> >>> numPartitions is fixed, the size of one partition will be
>>>>> large. This
>>>>> >> >>> introduces two issues: the first is, the larger partition will
>>>>> need
>>>>> >> >>> larger
>>>>> >> >>> object and more memory at runtime, and trigger GC more
>>>>> frequently; the
>>>>> >> >>> second is, we meet the issue 'size exceeds integer.max_value'
>>>>> error,
>>>>> >> >>> which
>>>>> >> >>> seems be caused by the size of one partition larger than 2G
>>>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>>>>> >> >>>
>>>>> >> >>> 2. Coalesce partitions with shuffling, then cache.
>>>>> >> >>>
>>>>> >> >>> data.coalesce(numPartitions, true).cache()
>>>>> >> >>>
>>>>> >> >>> It could mitigate the second issue in #1 at some degree, but
>>>>> fist
>>>>> >> >>> issue
>>>>> >> >>> is still there, and it also will introduce large amount of
>>>>> shullfling.
>>>>> >> >>>
>>>>> >> >>> 3. Cache data first, and coalesce partitions.
>>>>> >> >>>
>>>>> >> >>> data.cache().coalesce(numPartitions)
>>>>> >> >>>
>>>>> >> >>> In this way, the number of cached partitions is not change, but
>>>>> each
>>>>> >> >>> task
>>>>> >> >>> read the data from multiple partitions. However, I find the
>>>>> task will
>>>>> >> >>> loss
>>>>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that means
>>>>> that
>>>>> >> >>> tasks
>>>>> >> >>> read data from other nodes, and become slower than that read
>>>>> data from
>>>>> >> >>> local
>>>>> >> >>> memory.
>>>>> >> >>>
>>>>> >> >>> I think the best way should like #3, but leverage locality as
>>>>> more as
>>>>> >> >>> possible. Is there any way to do that? Any suggestions?
>>>>> >> >>>
>>>>> >> >>> Thanks!
>>>>> >> >>>
>>>>> >> >>> --
>>>>> >> >>> ZHENG, Xu-dong
>>>>> >> >>>
>>>>> >> >>
>>>>> >> >
>>>>> >> >
>>>>> >> >
>>>>> >> > --
>>>>> >> > 郑旭东
>>>>> >> > ZHENG, Xu-dong
>>>>> >> >
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > --
>>>>> > 郑旭东
>>>>> > ZHENG, Xu-dong
>>>>> >
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> 郑旭东
>>>> ZHENG, Xu-dong
>>>>
>>>>
>>>
>

Re: Is there any way to control the parallelism in LogisticRegression

Posted by DB Tsai <db...@dbtsai.com>.
With David's help today, we were able to implement elastic net glm in
Spark. It's surprising easy, and with just some modification in breeze's
OWLQN code, it just works without further investigation.

We did benchmark, and the coefficients are within 0.5% differences compared
with R's glmnet package. I guess this is first truly distributed glmnet
implementation.

Still require some effort to have it in mllib; mostly api cleanup work.

1) I'll submit a PR to breeze which implements weighted regularization in
OWLQN.
2) This also depends on https://issues.apache.org/jira/browse/SPARK-2505
which we have internal version requiring some cleanup for open source
project.


Sincerely,

DB Tsai
-------------------------------------------------------
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng <me...@gmail.com> wrote:

> +DB & David (They implemented QWLQN on Spark today.)
> On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <ch...@gmail.com> wrote:
>
>> Hi Xiangrui,
>>
>> A side-by question about MLLib.
>> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only
>> support L2 regurization, the doc explains it: "The L1 regularization by
>> using L1Updater
>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater>
>> will not work since the soft-thresholding logic in L1Updater is designed
>> for gradient descent."
>>
>> Since the algorithm comes from Breeze and I noticed Breeze actually
>> supports L1 (OWLQN
>> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering
>> if there is some special considerations that current MLLib didn't support
>> OWLQN? And any plan to add it?
>>
>> Thanks for your time!
>>
>>
>>
>> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <do...@gmail.com>
>> wrote:
>>
>>> Update.
>>>
>>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which
>>> sounds could control the locality. The default value is 0.1 (smaller value
>>> means lower locality). I change it to 1.0 (full locality) and use #3
>>> approach, then find a lot improvement (20%~40%). Although the Web UI still
>>> shows the type of task as 'ANY' and the input is from shuffle read, but the
>>> real performance is much better before change this parameter.
>>> [image: Inline image 1]
>>>
>>> I think the benefit includes:
>>>
>>> 1. This approach keep the physical partition size small, but make each
>>> task handle multiple partitions. So the memory requested for
>>> deserialization is reduced, which also reduce the GC time. That is exactly
>>> what we observed in our job.
>>>
>>> 2. This approach will not hit the 2G limitation, because it not change
>>> the partition size.
>>>
>>> And I also think that, Spark may change this default value, or at least
>>> expose this parameter to users (*CoalescedRDD *is a private class, and
>>> *RDD*.*coalesce* also don't have a parameter to control that).
>>>
>>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <me...@gmail.com>
>>> wrote:
>>>
>>>> Sorry, I missed #2. My suggestion is the same as #2. You need to set a
>>>> bigger numPartitions to avoid hitting integer bound or 2G limitation,
>>>> at the cost of increased shuffle size per iteration. If you use a
>>>> CombineInputFormat and then cache, it will try to give you roughly the
>>>> same size per partition. There will be some remote fetches from HDFS
>>>> but still cheaper than calling RDD.repartition().
>>>>
>>>> For coalesce without shuffle, I don't know how to set the right number
>>>> of partitions either ...
>>>>
>>>> -Xiangrui
>>>>
>>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <do...@gmail.com>
>>>> wrote:
>>>> > Hi Xiangrui,
>>>> >
>>>> > Thanks for your reply!
>>>> >
>>>> > Yes, our data is very sparse, but RDD.repartition invoke
>>>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I think it
>>>> has
>>>> > the same effect with #2, right?
>>>> >
>>>> > For CombineInputFormat, although I haven't tried it, but it sounds
>>>> that it
>>>> > will combine multiple partitions into a large partition if I cache
>>>> it, so
>>>> > same issues as #1?
>>>> >
>>>> > For coalesce, could you share some best practice how to set the right
>>>> number
>>>> > of partitions to avoid locality problem?
>>>> >
>>>> > Thanks!
>>>> >
>>>> >
>>>> >
>>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <me...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >> Assuming that your data is very sparse, I would recommend
>>>> >> RDD.repartition. But if it is not the case and you don't want to
>>>> >> shuffle the data, you can try a CombineInputFormat and then parse the
>>>> >> lines into labeled points. Coalesce may cause locality problems if
>>>> you
>>>> >> didn't use the right number of partitions. -Xiangrui
>>>> >>
>>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <do...@gmail.com>
>>>> >> wrote:
>>>> >> > I think this has the same effect and issue with #1, right?
>>>> >> >
>>>> >> >
>>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <
>>>> chenjiusheng@gmail.com>
>>>> >> > wrote:
>>>> >> >>
>>>> >> >> How about increase HDFS file extent size? like current value is
>>>> 128M,
>>>> >> >> we
>>>> >> >> make it 512M or bigger.
>>>> >> >>
>>>> >> >>
>>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <
>>>> dong128@gmail.com>
>>>> >> >> wrote:
>>>> >> >>>
>>>> >> >>> Hi all,
>>>> >> >>>
>>>> >> >>> We are trying to use Spark MLlib to train super large data (100M
>>>> >> >>> features
>>>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By
>>>> default,
>>>> >> >>> MLlib
>>>> >> >>> will create a task for every partition at each iteration. But
>>>> because
>>>> >> >>> our
>>>> >> >>> dimensions are also very high, such large number of tasks will
>>>> >> >>> increase
>>>> >> >>> large network overhead to transfer the weight vector. So we want
>>>> to
>>>> >> >>> reduce
>>>> >> >>> the number of tasks, we tried below ways:
>>>> >> >>>
>>>> >> >>> 1. Coalesce partitions without shuffling, then cache.
>>>> >> >>>
>>>> >> >>> data.coalesce(numPartitions).cache()
>>>> >> >>>
>>>> >> >>> This works fine for relative small data, but when data is
>>>> increasing
>>>> >> >>> and
>>>> >> >>> numPartitions is fixed, the size of one partition will be large.
>>>> This
>>>> >> >>> introduces two issues: the first is, the larger partition will
>>>> need
>>>> >> >>> larger
>>>> >> >>> object and more memory at runtime, and trigger GC more
>>>> frequently; the
>>>> >> >>> second is, we meet the issue 'size exceeds integer.max_value'
>>>> error,
>>>> >> >>> which
>>>> >> >>> seems be caused by the size of one partition larger than 2G
>>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>>>> >> >>>
>>>> >> >>> 2. Coalesce partitions with shuffling, then cache.
>>>> >> >>>
>>>> >> >>> data.coalesce(numPartitions, true).cache()
>>>> >> >>>
>>>> >> >>> It could mitigate the second issue in #1 at some degree, but fist
>>>> >> >>> issue
>>>> >> >>> is still there, and it also will introduce large amount of
>>>> shullfling.
>>>> >> >>>
>>>> >> >>> 3. Cache data first, and coalesce partitions.
>>>> >> >>>
>>>> >> >>> data.cache().coalesce(numPartitions)
>>>> >> >>>
>>>> >> >>> In this way, the number of cached partitions is not change, but
>>>> each
>>>> >> >>> task
>>>> >> >>> read the data from multiple partitions. However, I find the task
>>>> will
>>>> >> >>> loss
>>>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that means
>>>> that
>>>> >> >>> tasks
>>>> >> >>> read data from other nodes, and become slower than that read
>>>> data from
>>>> >> >>> local
>>>> >> >>> memory.
>>>> >> >>>
>>>> >> >>> I think the best way should like #3, but leverage locality as
>>>> more as
>>>> >> >>> possible. Is there any way to do that? Any suggestions?
>>>> >> >>>
>>>> >> >>> Thanks!
>>>> >> >>>
>>>> >> >>> --
>>>> >> >>> ZHENG, Xu-dong
>>>> >> >>>
>>>> >> >>
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> > --
>>>> >> > 郑旭东
>>>> >> > ZHENG, Xu-dong
>>>> >> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > 郑旭东
>>>> > ZHENG, Xu-dong
>>>> >
>>>>
>>>
>>>
>>>
>>> --
>>> 郑旭东
>>> ZHENG, Xu-dong
>>>
>>>
>>

Re: Is there any way to control the parallelism in LogisticRegression

Posted by Xiangrui Meng <me...@gmail.com>.
+DB & David (They implemented QWLQN on Spark today.)
On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <ch...@gmail.com> wrote:

> Hi Xiangrui,
>
> A side-by question about MLLib.
> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only support
> L2 regurization, the doc explains it: "The L1 regularization by using
> L1Updater
> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater>
> will not work since the soft-thresholding logic in L1Updater is designed
> for gradient descent."
>
> Since the algorithm comes from Breeze and I noticed Breeze actually
> supports L1 (OWLQN
> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering
> if there is some special considerations that current MLLib didn't support
> OWLQN? And any plan to add it?
>
> Thanks for your time!
>
>
>
> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <do...@gmail.com>
> wrote:
>
>> Update.
>>
>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which
>> sounds could control the locality. The default value is 0.1 (smaller value
>> means lower locality). I change it to 1.0 (full locality) and use #3
>> approach, then find a lot improvement (20%~40%). Although the Web UI still
>> shows the type of task as 'ANY' and the input is from shuffle read, but the
>> real performance is much better before change this parameter.
>> [image: Inline image 1]
>>
>> I think the benefit includes:
>>
>> 1. This approach keep the physical partition size small, but make each
>> task handle multiple partitions. So the memory requested for
>> deserialization is reduced, which also reduce the GC time. That is exactly
>> what we observed in our job.
>>
>> 2. This approach will not hit the 2G limitation, because it not change
>> the partition size.
>>
>> And I also think that, Spark may change this default value, or at least
>> expose this parameter to users (*CoalescedRDD *is a private class, and
>> *RDD*.*coalesce* also don't have a parameter to control that).
>>
>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <me...@gmail.com> wrote:
>>
>>> Sorry, I missed #2. My suggestion is the same as #2. You need to set a
>>> bigger numPartitions to avoid hitting integer bound or 2G limitation,
>>> at the cost of increased shuffle size per iteration. If you use a
>>> CombineInputFormat and then cache, it will try to give you roughly the
>>> same size per partition. There will be some remote fetches from HDFS
>>> but still cheaper than calling RDD.repartition().
>>>
>>> For coalesce without shuffle, I don't know how to set the right number
>>> of partitions either ...
>>>
>>> -Xiangrui
>>>
>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <do...@gmail.com>
>>> wrote:
>>> > Hi Xiangrui,
>>> >
>>> > Thanks for your reply!
>>> >
>>> > Yes, our data is very sparse, but RDD.repartition invoke
>>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I think it
>>> has
>>> > the same effect with #2, right?
>>> >
>>> > For CombineInputFormat, although I haven't tried it, but it sounds
>>> that it
>>> > will combine multiple partitions into a large partition if I cache it,
>>> so
>>> > same issues as #1?
>>> >
>>> > For coalesce, could you share some best practice how to set the right
>>> number
>>> > of partitions to avoid locality problem?
>>> >
>>> > Thanks!
>>> >
>>> >
>>> >
>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <me...@gmail.com>
>>> wrote:
>>> >>
>>> >> Assuming that your data is very sparse, I would recommend
>>> >> RDD.repartition. But if it is not the case and you don't want to
>>> >> shuffle the data, you can try a CombineInputFormat and then parse the
>>> >> lines into labeled points. Coalesce may cause locality problems if you
>>> >> didn't use the right number of partitions. -Xiangrui
>>> >>
>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <do...@gmail.com>
>>> >> wrote:
>>> >> > I think this has the same effect and issue with #1, right?
>>> >> >
>>> >> >
>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <
>>> chenjiusheng@gmail.com>
>>> >> > wrote:
>>> >> >>
>>> >> >> How about increase HDFS file extent size? like current value is
>>> 128M,
>>> >> >> we
>>> >> >> make it 512M or bigger.
>>> >> >>
>>> >> >>
>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <
>>> dong128@gmail.com>
>>> >> >> wrote:
>>> >> >>>
>>> >> >>> Hi all,
>>> >> >>>
>>> >> >>> We are trying to use Spark MLlib to train super large data (100M
>>> >> >>> features
>>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By
>>> default,
>>> >> >>> MLlib
>>> >> >>> will create a task for every partition at each iteration. But
>>> because
>>> >> >>> our
>>> >> >>> dimensions are also very high, such large number of tasks will
>>> >> >>> increase
>>> >> >>> large network overhead to transfer the weight vector. So we want
>>> to
>>> >> >>> reduce
>>> >> >>> the number of tasks, we tried below ways:
>>> >> >>>
>>> >> >>> 1. Coalesce partitions without shuffling, then cache.
>>> >> >>>
>>> >> >>> data.coalesce(numPartitions).cache()
>>> >> >>>
>>> >> >>> This works fine for relative small data, but when data is
>>> increasing
>>> >> >>> and
>>> >> >>> numPartitions is fixed, the size of one partition will be large.
>>> This
>>> >> >>> introduces two issues: the first is, the larger partition will
>>> need
>>> >> >>> larger
>>> >> >>> object and more memory at runtime, and trigger GC more
>>> frequently; the
>>> >> >>> second is, we meet the issue 'size exceeds integer.max_value'
>>> error,
>>> >> >>> which
>>> >> >>> seems be caused by the size of one partition larger than 2G
>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>>> >> >>>
>>> >> >>> 2. Coalesce partitions with shuffling, then cache.
>>> >> >>>
>>> >> >>> data.coalesce(numPartitions, true).cache()
>>> >> >>>
>>> >> >>> It could mitigate the second issue in #1 at some degree, but fist
>>> >> >>> issue
>>> >> >>> is still there, and it also will introduce large amount of
>>> shullfling.
>>> >> >>>
>>> >> >>> 3. Cache data first, and coalesce partitions.
>>> >> >>>
>>> >> >>> data.cache().coalesce(numPartitions)
>>> >> >>>
>>> >> >>> In this way, the number of cached partitions is not change, but
>>> each
>>> >> >>> task
>>> >> >>> read the data from multiple partitions. However, I find the task
>>> will
>>> >> >>> loss
>>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that means that
>>> >> >>> tasks
>>> >> >>> read data from other nodes, and become slower than that read data
>>> from
>>> >> >>> local
>>> >> >>> memory.
>>> >> >>>
>>> >> >>> I think the best way should like #3, but leverage locality as
>>> more as
>>> >> >>> possible. Is there any way to do that? Any suggestions?
>>> >> >>>
>>> >> >>> Thanks!
>>> >> >>>
>>> >> >>> --
>>> >> >>> ZHENG, Xu-dong
>>> >> >>>
>>> >> >>
>>> >> >
>>> >> >
>>> >> >
>>> >> > --
>>> >> > 郑旭东
>>> >> > ZHENG, Xu-dong
>>> >> >
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > 郑旭东
>>> > ZHENG, Xu-dong
>>> >
>>>
>>
>>
>>
>> --
>> 郑旭东
>> ZHENG, Xu-dong
>>
>>
>