You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/01/23 00:57:10 UTC

[GitHub] [druid] jihoonson opened a new issue #9241: Append support for hash/range-partitioned segments

jihoonson opened a new issue #9241: Append support for hash/range-partitioned segments
URL: https://github.com/apache/druid/issues/9241
 
 
   ### Motivation
   
   Druid currently doesn't support append hash/range-partitioned segments even though it's required in productions pretty often.
   
   ### Proposed changes
   
   This proposal proposes to support append segments only when the new segments are partitioned in the same way with the existing ones. For example, you can append new hash-partitioned segments to an existing hash-partitioned datasource if they are partitioned using the exactly same hash function. However, you cannot append range-partitioned segments to a hash-partitioned datasource. You can always append linearly partitioned segments to a linearly partitioned datasource.
   
   In this proposal, I would like to use the term "partition ID" to represent the last number in the segment ID. For example, given a segment `myDatasource_2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z_2020-01-02T02:23:54.235Z_2`, its partition ID is 2. The partition ID is regarded as 0 if it's missing in the segment ID.
   
   #### Changes in `ShardSpec`
   
   Currently, the partition ID of the segment is tightly coupled with the bucket in the hash/range partitioning. For example, in the hash partitioning, the partition ID is `(the hash value of the partition dimensions % partition size)`. In the range partitioning, the partition ID is the linearly increasing number for range-partitioned buckets.
   
   However, to support append, we need a new way to tell the given two segments fall in the same bucket or not. To do this, the segment needs to keep an additional information of the bucket as below:
   
   ```java
   public interface ShardSpec
   {
   ...
     /**
      * Returns the bucket ID of this partition (segment). A bucket represents the secondary partition
      * in the hash or the range partitioning. This always returns 0 for the linear partitioning.
      */
     short getBucketId();
   ...
   }
   ```
   
   Note that we need to track all segments in the same bucket for the easier segment pruning in the brokers. To make things simple, the partition ID must be _aligned_ with the bucket ID which means `partition ID = n * bucket ID`. As a result, the partition IDs might not be continuous after append new segments. This is also useful for compatibility because the bucket ID can be easily restored even when it's missing after the downgrades.
   
   This may cause some issues with the minor compaction because the minor compaction can compact segments only when their partition IDs are adjacent. See the Future work section for this issue.
   
   #### Changes in the Simple task and the Parallel task
   
   The index task needs to allow `partitionsSpec` when `forceGuaranteedRollup` = false because the append doesn't necessarily guarantee the perfect rollup across the entire datasource. If `appendToExisting` = true and `forceGuaranteedRollup` = false in the ingestionSpec, then the indexing task will create the _perfectly rolled up segments_ from the input and append them to an existing datasource. This is to create more compact segments, but I guess it might be useful to not create perfectly rolled up segments in some cases. Please see the Future work section for more details.
   
   If `appendingToExisting` = true, the `partitionsSpec` can be optional.
   
   - If it's an empty time chunk, the given `partitionsSpec` will be used.
   - If it's not an empty time chunk, the given `partitionsSpec` will be ignored with a warning in the task logs.
     - When the parallel index task appends to a non-empty datasource with the range partitioning, it can skip the first phase to find the best partitioning.
   
   If `appendToExisting` = true, the indexing task should use the action-based segment allocator to find proper partition IDs for new segments which is a centralized segment allocator in the overlord. When they request to allocate new segment IDs, the indexing task should send the proper bucket ID to the overlord. As a result, `SegmentAllocateAction` would be:
   
   ```java
     @JsonCreator
     public SegmentAllocateAction(
         @JsonProperty("dataSource") String dataSource,
         @JsonProperty("timestamp") DateTime timestamp,
         @JsonProperty("queryGranularity") Granularity queryGranularity,
         @JsonProperty("preferredSegmentGranularity") Granularity preferredSegmentGranularity,
         @JsonProperty("sequenceName") String sequenceName,
         @JsonProperty("previousSegmentId") String previousSegmentId,
         @JsonProperty("skipSegmentLineageCheck") boolean skipSegmentLineageCheck,
         @JsonProperty("bucketId") @Nullable Short bucketId, // new field
         @JsonProperty("shardSpecFactory") @Nullable ShardSpecFactory shardSpecFactory, // nullable for backward compatibility
         @JsonProperty("lockGranularity") @Nullable LockGranularity lockGranularity // nullable for backward compatibility
     )
   ...
   ```
   
   The `SegmentAllocateAction` will return a proper partition ID which is aligned to be the given bucket ID.
   
   #### Backward compatibility
   
   - The `shardSpec` computes the bucket ID by `partition ID % core partition size` if it's missing in the JSON. The core partition size is the number of segments when the first task 
   - The requested bucket ID in the `SegmentAllocateAction` is regarded as 0 if it's missing in the JSON.
   
   ### Rationale
   
   These are dropped ideas.
   
   - The "Multi" input source
     - Just like the Hadoop task, the native task can support the Multi input source to read data from two or more input sources. This indirectly supports a sort of "append" by specifying both the existing segments and new data in the input source spec. However, the main use case of the Multi input source would be reading from multiple input sources rather than append.
   - Independent partition ID and bucket ID
     - To make the partition ID and the bucket ID independent, the `VersionedIntervalTimeline` should be able to efficiently filter out the segments of the same bucket. This requires an extra index or data structure in memory of the brokers and the coordinator.
   
   ### Operational impact
   
   - The broker and the coordinator can use a bit more memory because of the new field in the `shardSpec`.
   - Deprecate appending linearly partitioned segments to hash partitioned datasources
     - `HashBasedNumberedShardSpec` is being used to allow append linearly partitioned segments to hash partitioned datasources. However, I don't see this is very useful once we support appending hash partitioned segments to hash partitioned datasources.
   - There should be no issues in rolling upgrades and downgrades.
   
   ### Test plan
   
   - New integration tests will be added.
   - Will test in a real Druid cluster.
   - Will verify there is issue in rolling upgrades/downgrades
   
   ### Future work
   
   - When a task creates new segments with the hash/range partitioning, it can split a bucket into multiple segments based on the segment size (or number of rows). This can be thought as supporting the fixed linear partitioning for the _tertiary_ partitioning.
   - Partition-aware minor compaction
     - The minor compaction should be able to compact the segments of the same bucket. This may require changing the meaning of the "adjacent partition IDs" per partitioning method.
   - In this proposal, when `appendToExisting` = true and `forceGuaranteedRollup` = false, the indexing task always creates new segments which are perfectly rolled up. We may want to add a new configuration, `partitionWithShuffle`, to control this behavior.
   
   Semi-related, the segment pruning in brokers should be supported for hash partitioned datasources as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org