You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@drill.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/07/04 05:53:00 UTC

[jira] [Commented] (DRILL-5211) Queries fail due to direct memory fragmentation

    [ https://issues.apache.org/jira/browse/DRILL-5211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073139#comment-16073139 ] 

ASF GitHub Bot commented on DRILL-5211:
---------------------------------------

Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/866
  
    This commit provides another two levels of foundation for size-aware vector writers in the Drill record readers.
    
    Much of the material below appears in Javadoc throughout the code. But, it is gathered here for quick reference to speed the code review.
    
    The PR is broken into commits by layer of function. May be easier to review each commit one-by-one rather than looking at the whole mess in one big diff.
    
    ## Column Accessors
    
    A recent extension to Drill's set of test tools created a "row set" abstraction to allow us to create, and verify, record batches with very few lines of code. Part of this work involved creating a set of "column accessors" in the vector subsystem. Column readers provide a uniform API to obtain data from columns (vectors), while column writers provide a uniform writing interface.
    
    DRILL-5211 discusses a set of changes to limit value vectors to 16 MB in size (to avoid memory fragmentation due to Drill's two memory allocators.) The column accessors have proven to be so useful that they will be the basis for the new, size-aware writers used by Drill's record readers.
    
    Changes include:
    
    * Implement fill-empties logic for vectors that do not provide it.
    * Use the new size-aware methods, throwing vector overflow exceptions which can now occur.
    * Some fiddling to handle the non-standard names of vector functions.
    * Modify strings to use a default type of bytes[], but offset a String version for convenience.
    * Add “finish batch” logic to handle values omitted at the end of a batch. (This is a bug in some existing record readers.)
    
    ## Result Set Loader
    
    The second layer of this commit is the new “result set loader.” This abstraction is an evolution of the “Mutator” class in the scan batch, when used with the existing column writers (which some readers use and others do not.)
    
    A result set loader loads a set of tuples (AKA records, rows) from any source (such as a record reader) into a set of record batches. The loader:
    
    * Divides records into batches based on a maximum row count or a maximum vector size, whichever occurs first. (Later revisions may limit overall batch size.)
    * Tracks the start and end of each batch.
    * Tracks the start and end of each row.
    * Provides column loaders to write each column value.
    * Handles overflow when a vector becomes full, but the client still must finish writing the current row.
    
    The original Mutator class divided up responsibilities:
    
    * The Mutator handled the entire record batch
    * An optional VectorContainerWriter writes each record
    
    The result set loader follows this same general pattern.
    
    * The result set loader handles the entire record batch (really, a series of batches that make up the entire result set: hence the name.)
    * The TupleLoader class provides per-tuple services which mostly consists of access to the column loaders.
    * A tuple schema defines the schema for the result set (see below.)
    
    To hide this complexity from the client, a ResultSetLoader interface defines the public API. Then, a ResultSetLoaderImpl class implements the interface with all the gory details. Separate classes handle each column, the result set schema, and so on.
    
    This class is pretty complex, with a state machine per batch and per column, so take your time reviewing it.
    
    ## Column Loaders
    
    The column writers are low-level classes that interface between a consumer and a value vector. To create the tuple loader we need a higher-level abstraction: the column loader. (Not that there is no equivalent for reading columns at this time: generated code does the reading in its own special way for each operator.)
    
    Column loaders have a number of responsibilities:
    
    * Single class used for all data types. No more casting.
    * Transparently handle vector overflow and rollover.
    * Provide generic (Object-based) setters, most useful for testing.
    
    Because this commit seeks to prove the concept; the column loader supports a subset of types. Adding the other types is simply a matter of copy & paste, and will be done once things settle down. For now, the focus is on int and Varchar types (though the generic version supports all types.)
    
    To handle vector overflow, each “set” method:
    
    * Tries to write the value into the current vector (using a column writer)
    * If overflow occurs, tell the listener (the row set mutator) to create a new vector
    * Try the write a second time using the new vector
    
    The set of column writers must be synchronized (not in a multi-thread sense) on the current row position. As in the row set test utilities, a WriterIndex performs this task. (In fact, this is derived from the same writer index used for the row set test code and is defined by the column accessor code.)
    
    As with the row set version, a variety of column loader implementations exist depending on whether the underlying column is a scalar, an array, a map (not yet supported), etc. All this is transparent to the client of the tuple loader.
    
    ## Vector Overflow Logic
    
    The heart of this abstraction is that last point: the ability to detect when a vector overflows, switch in a new vector, and continue writing. Several tricks are involved.
    
    Suppose we have a row of five columns: a through e. The code writes a and b. Then, c overflows. The code can’t rewrite a and b. To handle this, the tuple loader:
    
    * Creates a new, small set of vectors called the “overflow batch”
    * Copies columns a and b from the current batch to the overflow batch.
    * Writes column c to the overflow batch.
    * Allows the client code to finish writing columns d and e (to the overflow batch).
    * Reports to the client that the batch is full.
    
    Note that the client is completely unaware that any of the above occurred: it just writes a row and asks if it can write another.
    
    ## Skipping Columns
    
    The loader must also handle a reader, such as Parquet, that skips columns if they are null. There were bugs in Drill’s vectors for this case and temporary patches were made in a number of places to make this work. The trick also should work for arrays (a null array is allowed, Drill represents it as an empty array.) But, this code also was broken. For good measure, the code now also allows skipping non-null columns if a good “empty” value is available: 0 for numbers, blank for strings. This behavior is needed for the CSV reader; if a line is missing a field, the CSV reader treats it as an empty (not null) field.
    
    ## Result Set Schema
    
    The tuple loader is designed to handle two kinds of tables: “early schema” (such as Parquet and CSV) define the schema up front. “Late schema” (such as JSON) discover the schema during reading. The tuple loader allows either form, and, in fact, uses the same mechanism. (The only caveat is that issues occur if adding a non-null column after the first row has been loaded.)
    
    Consumer of batches will, of course, want to know that the schema changed. Providing a simple flag is muddy: when should it be reset? A better solution is to provide a schema version which is incremented each time a column is added. (Columns cannot be removed or changed — at least not yet.)
    
    ## Internal Vectors vs. Vector Container
    
    The result set loader uses its own mechanism to manage vectors within the loader. Vectors are stored on each column to allow quick, indexed access and to simplify creating new columns.
    
    However, the consumer of the batch (eventually, a new scan batch), wants a vector container. A special class handles this translation, including incrementally modifying the container as new columns are added.
    
    ## Logical Tuples
    
    As if the above were not complex enough, we must deal with another layer of complexity. Suppose we have a query of the form:
    
    ```
    SELECT * FROM myTable
    ```
    
    In such a query, the reader will read all columns using the tuple loader. Very simple. But, many queries are of the form:
    
    ```
    SELECT a, b FROM myTable
    ```
    
    Where “myTable” contains columns (a, b, c, d, e). There is no point in reading columns c, d and e: we’d just throw them away. Instead, we want to define a “logical tuple” that contains just (a, b) and not even read the others.
    
    Each Drill record reader does this in its own way. The tuple loader provides a new, standard solution in the form of a logical tuple loader.
    
    The logical tuple loader works just like the regular one: but it knows which columns are projected and which are not. If the reader asks for a projected column, the logical loader returns a column loader to load the value. But, when the reader asks for a non-projected column, the logical loader simply returns null, telling the application to discard that column (or, better, to not read it at all.)
    
    The logical loader is needed because the regular loader will create columns on the fly: the logical loader intercepts the column request and returns null instead.
    
    ## Materialized Schema
    
    For reasons that will become clear in the next PR, the scan batch ends up doing quite a bit of semantic analysis to map from the select list and the table schema to the result schema. Drill provides a BatchSchema class that is useful, but limited in this context. To solve this problem, a new class, MaterializedSchema, does what BatchSchema does, but allows fast access by both name and position, and allows the schema to grow dynamically.
    
    The row set abstractions for testing already had a concept of a tuple schema, so this was extracted and extended to act as the foundation for the materialized schema.
    
    ## Result Vector Cache
    
    Above we mentioned that the tuple loader allows schema changes on the fly. As the next PR will make more clear, downstream operators want a fixed set of vectors. To assist with this, the tuple loader uses a “result vector cache”. Let’s say a scanner reads two JSON files with the same schema. The first crates the schema and vectors. The second is obligated to use the same vectors. This is a royal pain. But, the vector cache does it automatically: when the tuple loader adds a new column, it checks if the vector already exists in the cache and reuses it. If not there, the cache adds it and returns it so that it is there the next time around.
    
    ## Map, List, Union and Other Complex Support
    
    This commit does not yet address complex types such as maps, lists, union vectors, and so on. The idea is to get the basics to work first. The commit does, however, support arrays of primitive and Varchar types.
    
    ## Row Set Test Classes
    
    The row set test classes and the above new classes share the same column accessors. The test classes were updated to catch the new overflow exception. Because the test code is used to creates small batches as test input data, the overflow exception is translated to an unchecked exception to keep test code simple.
    
    Several row set index classes were moved and adjusted to use the revised form needed for the tuple loader.
    
    A few names were changed to reduce confusion (mine) over what they meant.
    
    ## Unit Tests
    
    All of the above is pretty thoroughly tested via unit tests. In fact, the unit tests are a good place to start (now I tell you!) in order to see how client code uses the various abstractions.
    
    The bit of unit test structure that handled system options turned out to be wrong. Modified it to use the defaults defined in the system option manager, which required changing the visibility of the defaults table.
    
    ## Other
    
    Some unit tests were updated to use new features which become available in this PR. See TestFillEmpties and TestVectorLimits.
    
    The `equals()` method in BatchSchema is badly broken. Cleaned it up some. But, didn’t want to change it too much in case anything depends on the current, broken, semantics. So, added a new `isEquivalent` method to provide the correct semantics. Added an `isEquivalent()` method to the MaterializedField as well that will ignore the “implementation” columns that hang off of types such as nullables, repeated, etc. That is, two repeated columns are identical if their type is identical, regardless of whether one has the “$offsets” child or not.


> Queries fail due to direct memory fragmentation
> -----------------------------------------------
>
>                 Key: DRILL-5211
>                 URL: https://issues.apache.org/jira/browse/DRILL-5211
>             Project: Apache Drill
>          Issue Type: Bug
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>             Fix For: 1.9.0
>
>         Attachments: ApacheDrillMemoryFragmentationBackground.pdf, ApacheDrillVectorSizeLimits.pdf, EnhancedScanOperator.pdf, ScanSchemaManagement.pdf
>
>
> Consider a test of the external sort as follows:
> * Direct memory: 3GB
> * Input file: 18 GB, with one Varchar column of 8K width
> The sort runs, spilling to disk. Once all data arrives, the sort beings to merge the results. But, to do that, it must first do an intermediate merge. For example, in this sort, there are 190 spill files, but only 19 can be merged at a time. (Each merge file contains 128 MB batches, and only 19 can fit in memory, giving a total footprint of 2.5 GB, well below the 3 GB limit.
> Yet, when loading batch xx, Drill fails with an OOM error. At that point, total available direct memory is 3,817,865,216. (Obtained from {{maxMemory}} in the {{Bits}} class in the JDK.)
> It appears that Drill wants to allocate 58,257,868 bytes, but the {{totalCapacity}} (again in {{Bits}}) is already 3,800,769,206, causing an OOM.
> The problem is that, at this point, the external sort should not ask the system for more memory. The allocator for the external sort is at just 1,192,350,366 before the allocation request. Plenty of spare memory should be available, released when the in-memory batches were spilled to disk prior to merging. Indeed, earlier in the run, the sort had reached a peak memory usage of 2,710,716,416 bytes. This memory should be available for reuse during merging, and is plenty sufficient to fill the particular request in question.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)