You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mahout.apache.org by Dmitriy Lyubimov <dl...@gmail.com> on 2015/04/07 00:56:09 UTC

Re: par() stuff

I probably will try to answer these on the list. Slack is only on my phone
now.

(1) par, generally, does not do shuffle. Internally its implementation
largely relies on suffless coalesce() Spark api.

What it means, it will do great job reducing or increasing parallelism 5x
or more without doing shuffle and observing approximate uniformity of
splits.

(2) as a corollary, it means it will not eliminate problem of empty
partitions.

(3) optimizer will not create problems in RDDs if initial rdd passed to
drmWrap() did not have problems.

(4) optimizer does not validate rdds (in drmWrap() or elsewhere) for
correctness for expense reasons.

However, we probably may want to create a routine that validates internal
rdd structure (as a map-only or all-reduce op) which can be used by tools
like data importers before passing data to algebra.

-d

Re: par() stuff

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
On Mon, Apr 6, 2015 at 5:23 PM, Pat Ferrel <pa...@occamsmachete.com> wrote:

>
> Skewed or incorrect partitioning, given that I don’t know what that means,
> may or may not happen. If it needs to be checked or adjusted it certainly
> could be done. It would probably be best to do it in some variant or method
> of CheckpointedDrm, right?
>
>
> So it seems like we have two things to look at post 0.10.0
> 1) do we need an rdd validator or smart repartitioner and what exactly
> should it do.
>

i don't believe we need smart repartitioner, pragmatically. coalesce,
maybe.
No, it is not method on checkpointedDRM -- this is abstract algebra, has
nothing to do with rdds. Just a method somewhere in sparkbindings package,
just next to drmWrap(). Maybe drmWrap can also accept a flag to do that
automatically, false by default.


> 2) what should we do to optimize reading many small files vs less large
> files. It seems like the two cases are ignored by Spark whether reading
> sequence files of text files.
>

I think this is what par() is for. coalesce will do nicely.

Strictly speaking, it would be a pretty weird loader to encounter
significant partitioning skew. I think in Spark coalesce (doShuffle=true)
should largely address these.

Re: par() stuff

Posted by Pat Ferrel <pa...@occamsmachete.com>.
The place I saw the failure (now apparently fixed) was in a range of a vector that had size == 0 and the size was not checked. I assume the real problem occurred earlier. So the exception is always a bad vector range.

We may also have empty partitions, not sure. The way to reproduce the error is to have a bunch of small input files of one tuple per line and 0 or more lines per file. They are used to create a matrix of vectors with a .groupByKey().map. The key being an Int in this case. See the element reader, which reads a tuple per line https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala

The dimensionality is always correct by ncol and nrow AFAIK. I’ve checked in the recent case. And since this can happen for A’A only, there has been no dimensionality adjustment to allow empty rows. This means that there are guaranteed to be no empty blocks in mapBlock—for what it’s worth. I have not checked that every vector has the same cardinality but would be surprised given the code.

Skewed or incorrect partitioning, given that I don’t know what that means, may or may not happen. If it needs to be checked or adjusted it certainly could be done. It would probably be best to do it in some variant or method of CheckpointedDrm, right?

This recent case seemed to be fixed by drm.repartition(someSmallNumber) but I imagine this is accidental and not even a good hack. Given the way the data was read this is likely to reduce the number of partitions drastically.

There is an issue that seems to surface this problem the most often and that is 
  var columns = mc.textFile(source).map { line => line.split(delimiter) }
contains a source string, which is a very long list of files or directories, created from small Dstream batch rdds. When textFile looks at this it reads all hdfs file status objects to determine partitioning. This can take a long time so it may be better to block up the files and read them in .map( read some files ). This was the advice of a Spark committer. This seems to be a good case for sc.textFile() having its own optimization step, to handle part files written by Spark Streaming or large file written by some other mechanism.

Not sure if these issues are related but it is the thing in common between the last two occurrences of the vector range problem.

So it seems like we have two things to look at post 0.10.0
1) do we need an rdd validator or smart repartitioner and what exactly should it do.
2) what should we do to optimize reading many small files vs less large files. It seems like the two cases are ignored by Spark whether reading sequence files of text files.


On Apr 6, 2015, at 4:03 PM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

PS  problems of rdds we are trying to feed to drmWrap() are mostly of 2
kinds:

(1) skewed/incorrect partitioning, e.g. due to prefiltering or degenerate
splitting (at least 1 row vector is required in every partition);

(2) invalid data dimensionality (although some operators may be forgiving
of that, in general, all vectors in a row-partitioned format must have the
same cardinality).

Either problem is responsibility of data loader (e.g. drmDfsRead()), not
algebra's. so no need to try to hack it in the optimizer's guts.


On Mon, Apr 6, 2015 at 3:56 PM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> I probably will try to answer these on the list. Slack is only on my phone
> now.
> 
> (1) par, generally, does not do shuffle. Internally its implementation
> largely relies on suffless coalesce() Spark api.
> 
> What it means, it will do great job reducing or increasing parallelism 5x
> or more without doing shuffle and observing approximate uniformity of
> splits.
> 
> (2) as a corollary, it means it will not eliminate problem of empty
> partitions.
> 
> (3) optimizer will not create problems in RDDs if initial rdd passed to
> drmWrap() did not have problems.
> 
> (4) optimizer does not validate rdds (in drmWrap() or elsewhere) for
> correctness for expense reasons.
> 
> However, we probably may want to create a routine that validates internal
> rdd structure (as a map-only or all-reduce op) which can be used by tools
> like data importers before passing data to algebra.
> 
> -d
> 


Re: par() stuff

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
PS  problems of rdds we are trying to feed to drmWrap() are mostly of 2
kinds:

(1) skewed/incorrect partitioning, e.g. due to prefiltering or degenerate
splitting (at least 1 row vector is required in every partition);

(2) invalid data dimensionality (although some operators may be forgiving
of that, in general, all vectors in a row-partitioned format must have the
same cardinality).

Either problem is responsibility of data loader (e.g. drmDfsRead()), not
algebra's. so no need to try to hack it in the optimizer's guts.


On Mon, Apr 6, 2015 at 3:56 PM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> I probably will try to answer these on the list. Slack is only on my phone
> now.
>
> (1) par, generally, does not do shuffle. Internally its implementation
> largely relies on suffless coalesce() Spark api.
>
> What it means, it will do great job reducing or increasing parallelism 5x
> or more without doing shuffle and observing approximate uniformity of
> splits.
>
> (2) as a corollary, it means it will not eliminate problem of empty
> partitions.
>
> (3) optimizer will not create problems in RDDs if initial rdd passed to
> drmWrap() did not have problems.
>
> (4) optimizer does not validate rdds (in drmWrap() or elsewhere) for
> correctness for expense reasons.
>
> However, we probably may want to create a routine that validates internal
> rdd structure (as a map-only or all-reduce op) which can be used by tools
> like data importers before passing data to algebra.
>
> -d
>