You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/07/10 23:32:32 UTC

[GitHub] [incubator-druid] jihoonson opened a new issue #8061: Native parallel batch indexing with shuffle

jihoonson opened a new issue #8061: Native parallel batch indexing with shuffle
URL: https://github.com/apache/incubator-druid/issues/8061
 
 
   ### Motivation
   
   General motivation for native batch indexing is described in https://github.com/apache/incubator-druid/issues/5543.
   
   We now have the parallel index task, but it doesn't support perfect rollup yet because of lack of the shuffle system.
   
   ### Proposed changes
   
   I would propose to add a new mode for parallel index task which supports perfect rollup with two-phase shuffle.
   
   #### Two phase partitioning with shuffle
   
   ![Phase 1](https://user-images.githubusercontent.com/2322288/59528209-2b746900-8ecd-11e9-8024-5b40f7521f49.png)
   
   Phase 1: each task partitions data by segmentGranularity and then by hash or range key of some dimensions.
   
   ![Phase 2](https://user-images.githubusercontent.com/2322288/59528211-2d3e2c80-8ecd-11e9-80f0-a504449eef81.png)
   
   Phase 2: each task reads a set of partitions created by the tasks of Phase 1 and creates a segment per partition.
   
   #### `PartitionsSpec` support for `IndexTask` and `ParallelIndexTask`
   
   `PartitionsSpec` is the way to define the secondary partitioning and is currently being used by `HadoopIndexTask`. This interface should be adjusted to be more general as below.
   
   ```java
   public interface PartitionsSpec
   {
     @Nullable
     Integer getNumShards();
     
     @Nullable
     Integer getMaxRowsPerSegment(); // or getTargetRowsPerSegment()
     
     @Nullable
     List<String> getPartitionDimensions();
   }
   ```
   
   Hadoop tasks can use an extended interface which is more specialized for Hadoop.
   
   ```java
   public interface HadoopPartitionsSpec extends PartitionsSpec
   {
     Jobby getPartitionJob(HadoopDruidIndexerConfig config);
     boolean isAssumeGrouped();
     boolean isDeterminingPartitions();
   }
   ```
   
   `IndexTask` currently provides duplicate configurations for partitioning in its tuningConfig such as `maxRowsPerSegment`, `maxTotalRows`, `numShards`, and `partitionDimensions`. These configurations will be deprecated and the indexTask will support `PartitionsSpec` instead.
   
   To support `maxRowsPerSegment` and `maxTotalRows`, a new partitionsSpec could be introduced.
   
   ```java
   /**
    * PartitionsSpec for best-effort rollup
    */
   public class DynamicPartitionsSpec implements PartitionsSpec
   {
     private final int maxRowsPerSegment;
     private final int maxTotalRows;
   }
   ```
   
   This partitionsSpec will be supported as a new configuration in the tuningConfig of `IndexTask` and `ParallelIndexTask`. 
   
   #### New parallel index task runner to support secondary partitioning
   
   `ParallelIndexSupervisorTask` is the supervisor task which orchestrates the parallel ingestion. It's responsible for spawning and monitoring sub tasks, and publishing created segments at the end of ingestion. 
   
   It uses `ParallelIndexTaskRunner` to run single-phase parallel ingestion without shuffle. To support two-phase ingestion, we can add a new implementation of `ParallelIndexTaskRunner`, `TwoPhaseParallelIndexTaskRunner`. `ParallelIndexSupervisorTask` will choose the new runner if partitionsSpec in tuningConfig is `HashedPartitionsSpec` or `RangePartitionsSpec`.
   
   This new taskRunner does the followings:
   
   - Add `TwoPhasesParallelIndexTaskRunner` as a new runner for the supervisor task
     - Spawns tasks for determining partitions (if `numShards` is missing in tuningConfig)
     - Spawns tasks for building partial segments (phase 1)
     - When all tasks of the phase 1 finish, spawns new tasks for building the complete segments (phase 2)
     - Each Phase 2 task is assigned one or multiple partitions
       - The assigned partition is represented as an HTTP URL
   - Publish the segments reported by phase 2 tasks.
   - Triggers intermediary data cleanup when the supervisor task is finished regardless of its last status.
   
   The supervisor task provides an additional configuration in its tuningConfig, i.e., `numSecondPhaseTasks` or  `inputRowsPerSecondPhaseTask`, to support control of parallelism of the phase 2. This will be improved to automatically determine the optimal parallelism in the future.
   
   #### New sub task types
   
   ##### Partition determine task
   
   - Similar to what indexTask or HadoopIndexTask do.
   - Scan the whole input data and collect `HyperLogLog` per interval to compute approximate cardinality.
   - numShards could be computed as below:
   
   ```java
           numShards = (int) Math.ceil(
               (double) numRows / Preconditions.checkNotNull(maxRowsPerSegment, "maxRowsPerSegment")
           );
   ```
   
   ##### Phase 1 task
   
   - Read data via the given firehose
   - Partition data by segmentGranularity by hash or range (and aggregates if rollup)
   - Should be able to access by (supervisorTaskId, timeChunk, partitionId)
   - Write partitioned _segments_ in local disk
   
   ##### Phase 2 task
   
   - Download all partial segments from middleManagers where phase 1 tasks ran.
   - Merge all fetched segments into a single segment per partitionId.
   - Push the merged segments and report them to the supervisor task.
   
   #### MiddleManager as Intermediary data server
   
   MiddleManager (and new Indexer) should be responsible for serving intermediary data during shuffle.
   
   Each phase 1 task partitions input data and generates partitioned segments. These partitioned segments are stored in local disk of middleManager (or indexer proposed in https://github.com/apache/incubator-druid/issues/7900). The partitioned segment location would be `/configured/prefix/supervisorTaskId/` directory.
   
   When the supervisor task is finished (either succeeded or failed), the overlord sends cleanup requests with supervisorTaskId to all middleManagers (and indexers).
   
   ##### New API lists of MiddleManager
   
   - GET `/druid/worker/v1/shuffle/partition?supervisorTaskId={taskId}&start={startTimeOfSegment}&end={endTimeOfSegment}&partitionId={partitionId}`
   
   Return all partial segments generated by sub tasks of the given supervisor task, falling in the given interval, and having the given partitionId.
   
   - DELETE `/druid/worker/v1/shuffle/supervisorTaskId`
   
   Removes all partial segments generated by sub tasks of the given supervisor task.
   
   #### New metrics
   
   - `ingest/task/time`: how long each task took
   - `ingest/task/bytes/processed`: how large data each task processed
   - `ingest/shuffle/bytes`: how large data each middleManager served
   - `ingest/shuffle/requests`: how many requests each middleManager served
   
   ### Rationale
   
   There could be two alternate designs for the shuffle system, especially for intermediary data server.
   
   MiddleManager (or indexer) as intermediary data server is the simplest design. In an alternative design, phase 1 tasks could serve intermediary data for shuffle. In this alternate, phase 1 tasks should be guaranteed to run until the phase 2 is finished, which means task 1 resources should be held until the phase 2 is finished. This is rejected for better resource utilization.
   
   Another alternate is a single set of tasks would process both phase 1 and phase 2. This design is rejected because it's not very flexible to use cluster resource efficiently.
   
   ### Operational impact
   
   `maxRowsPerSegment`, `numShards`, `partitionDimensions`, and `maxTotalRows` in tuningConfig will be deprecated for indexTask. `partitionsSpec` will be provided instead. The deprecated values will be removed in the next major release after the upcoming one.
   
   ### Test plan
   
   Unit tests and integration tests will be implemented. I will also test this with our internal cluster once it's ready.
   
   ### Future work
   
   - The optimal parallelism for the phase 2 should be able to be determined automatically by collecting statistics during the phase 1.
   - To avoid "too many open files" problem, middleManager should be able to smoosh the intermediary segments into several large files. 
   - If rollup is set, it could be better to combine intermediate data in middleManager before sending them. It would be similar to Hadoop's combiner.
     - This could be implemented to support seamless incremental segment merge in middleManager.
   - In Phase 1, tasks might skip index generation for faster shuffle. In this case, Phase 2 tasks should be able to generate the complete indexes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org