You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/06/14 23:38:27 UTC

[GitHub] [incubator-druid] jon-wei opened a new issue #7900: Develop a new `BigIndexer` process for running ingestion tasks

jon-wei opened a new issue #7900: Develop a new `BigIndexer` process for running ingestion tasks
URL: https://github.com/apache/incubator-druid/issues/7900
 
 
   ### Motivation
   
   The MiddleManager currently runs each task in a separate JVM, with identical configuration across all tasks spawned by a given MM. This model has some drawbacks:
   - Complexity of configuration: There is a level of indirection in the configurations (`druid.indexer.runner.javaOpts`), and to understand total resource usage the user must consider both the MM config (num workers) and the individual sub-JVM task configurations.
   - Easy to overprovision resources: On a single MM, all tasks will have identical sizing. However, not all tasks have the same resource requirements. This can be addressed to some extent with worker affinity, for cases where task load levels can be distinctly categorized based on datasource. However, even for a single datasource, not all tasks necessarily have equal resource requirements.
   - Redundant resource provisioning: Some in-memory information, lookup maps in particular, are common across tasks, but having separate task JVMs requires a separate copy of this info in each JVM. With large lookup maps, this can consume an immense amount of memory and considerably limit how many tasks an MM could run.
   
   ### Proposed changes
   
   A new process type, BigIndexer (tentative name), will be added. This new process type is an alternative to the MiddleManager that runs all tasks in the single BigIndexer JVM instead of forking new processes.
   
   #### Query processing
   
   - All queries made to tasks managed by the BigIndexer will be executed using shared processing buffers and threads (the standard `druid.processing.numThreads` and related configs).
   - Appenderator should be modified to support external segment management (i.e., the BigIndexer will take care of announcing segments and serving them instead of the individual tasks). `sinkTimeline` could be externally provided by the BigIndexer, used to back QueryRunner instances created by BigIndexer.
   - Each task would still maintain its own Appenderator instance for per-task memory tracking
   
   #### Task API resources
   
   The BigIndexer will have a single ChatHandlerResource at `/druid/worker/v1/chat/{id}`, serving requests for all tasks managed by the BigIndexer.
   
   #### Task Resource Management
   
   The BigIndexer will have a configurable global heap limit `globalIngestionHeapLimitBytes` across all tasks that applies to ingestion workloads (what `maxBytesInMemory` is currently estimating) and merging workloads (related to <merging patch>).
   
   `globalIngestionHeapLimitBytes` should be lower than the total JVM heap size, to leave space for query workloads and to account for the fuzziness of the memory estimates being used.
   
   When the sum of ingestion memory usage and merging memory usage across all tasks reaches `globalIngestionHeapLimitBytes`, the BigIndexer will trigger a global persist, causing each managed task to persist its in-heap segment data. 
   
   Per-task ingestion heap memory usage will continue to be tracked using the same mechanisms that support `maxBytesInMemory`. 
   
   To track per-task merging memory usage, the task flow will change slightly:
   - Before attempting to merge segments, a task will make a request to the BigIndexer, indicating that it is about to merge.
   - When a merge notice is received, the BigIndexer will attempt to record two memory allocations in its memory usage tracking state: 
     - a fixed amount of heap. This can scale based on some proportion of `globalIngestionHeapLimitBytes`.
     - a fixed amount of direct memory, needed for decompression buffers and dictionary conversions. This can scale based on the size in bytes of the fuzzy `+1` factor in the familiar (`druid.processing.numThreads` + `druid.processing.numMergeBuffers` + 1) * `druid.processing.buffer.sizeBytes` formula.
     - Note that this will require changes to segment merging to allow it to respect memory bounds
   
   The BigIndexer will impose a resource management model that only considers byte-based memory estimates and limits. Row-based limits in any task specs will be ignored (rewritten to whatever represents "unlimited" for a given property).
   
   Each task will not have a fixed individual memory limit by default, only the global limit is applied. The BigIndexer will also have a mode where the global limit is divided evenly across the number of worker slots.
   
   The BigIndexer will allow optional per-task `maxBytesInMemory` configurations for tasks. If an individual `maxBytesInMemory` limit is hit, a task will persist individually. This is to help address potential memory starvation issues, when a subset of tasks have significantly higher data generation rates than other tasks. The per-task limit would be used to constrain tasks that are known ahead of time to have disparately high data generation rates.
   
   #### Task Assignments
   
   In the BigIndexer, different tasks can consume different amounts of resources, when using the default mode where tasks do not have individual `maxBytesInMemory` limits. This is a significant change from the existing MM model, where all tasks receive identical resource allocations.
   
   While this can result in better resource utilization when a combination of small and big tasks run together, it opens potential for skew in assigned workloads (e.g. a majority of high-resource consumption tasks happen to be assigned at the same time to a single BigIndexer). 
   
   To address this initially, task assignments to BigIndexer processes can be done mostly randomly (with some consideration for how many tasks have already been assigned to a BigIndexer) , combined with deployment guidance in the docs that instruct users to limit task durations when using BigIndexer in this mode.
   
   If the BigIndexer is running in the mode where each task has an equal share of the global heap limit, then the traditional MM task assignment algorithm can be used.
   
   ### Rationale
   
   To address the motivating concerns, other approaches are possible:
   
   - To simplify MM configuration, we could have the MM take total heap/direct memory settings and divide that across the tasks. This would still have the potential for overprovisioning though.
   - To avoid overprovisioning, we could have the MM spawn each task with unlimited heap/direct memory and apply a similar memory limiting protocol across the MM and tasks as described in this proposal. However, doing this memory management within a single process is simpler and more reliable than in a multi-process system.
   - To avoid redundant provisioning (where lookups are the main concern presently), we could memory map the lookups from files on disk and share the page cache across tasks. The lookup maps could be potentially evicted from page cache however, and lookups are in the hot path of queries, which is likely undesirable.
   
   The proposed approach was chosen because it addresses all three concerns.
   
   The primary drawback of the proposed approach is the loss of fault isolation from having separate task processes. For this reason, this proposal suggests adding a new process type instead of replacing the MM. 
   
   The loss of fault isolation means that the BigIndexer will need supporting patches to function well, described in the future work section.
   
   ### Operational impact
   
   - This proposal will not deprecate anything.
   - The BigIndexer should not be deployed until a user has fully upgraded their cluster to a version supporting BigIndexer. Older versions will not recognize the new process type.
   - If a user is downgrading, they should replace their BigIndexer instances with MiddleManagers first.
   
   ### Test plan (optional)
   
   The BigIndexer will be deployed in large clusters that are currently using MiddleManagers, completely replacing the MiddleManagers, and the correctness and stability of ingestion will be verified over time.
   
   ### Future work
   
   For the proposed approach to work well, we will need to adjust the segment merging sequence such that its memory usage is more bounded: currently, ingestion tasks can be prone to OOM during index merging, and the impact of such failures would be amplified in the single-process BigIndexer.
   
   We will also need to address the memory estimation issues with growable aggregators, described in issue https://github.com/apache/incubator-druid/issues/6743, although this is a less crucial supporting patch compared to memory bounded index merging.
   
   The initial implementation of BigIndexer will not be exposed to users in documentation: until the bounded memory segment merging is supported, BigIndexers will not be stable enough to be a practical choice.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org