You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/03/05 07:59:45 UTC

[GitHub] [druid] maytasm opened a new pull request #10948: Imply 5722

maytasm opened a new pull request #10948:
URL: https://github.com/apache/druid/pull/10948


   Fixes #XXXX.
   
   ### Description
   
   **When does this problem happens:**
   
   This can happens when we have sparse columns in the input data and a dimensionsSpec with dimension ordering that is not lexicographic. Note that this isn’t limited to Compaction but can also occurs in initial batch ingestion of data (index task).
   
   **Root cause:**
   
   This bug occurs when all the dimensions in the intermediate segments does not have a common shared dimension ordering. 
   
   For example, if the dimensions of my data are “dimA”, “dimB” and “dimC”, where both “dimA” and “dimC” are sparse. During ingestion, the following intermediate segments are created:
   
   intermediate segment 1 has column “dimA” and “dimB”. It does not have “dimC” since “dimC” is sparse and does not exist in any rows of intermediate segment 1.
   
   intermediate segment 2 has column “dimB” and “dimC”. It does not have “dimA” since “dimA” is sparse and does not exist in any rows of intermediate segment 2.
   
   When we try to merge all the intermediate segments, we will not be able to find a shared common dimensions across all the intermediate segments (there is no intermediate segment with all “dimA”, “dimB” and “dimC” in it’s index. The current behavior would then fall back into merging all the intermediate segments with a lexicographic ordering of all the dimensions. The final merged segment will have the lexicographic ordering. This can be different than the dimension specified in the ingestionSpec if the dimension specified is not lexicographic ordering.
   
   **How this impact user:**
   
   **Impact 1: Segment size.** When a user specify a dimenionsSpec with ordering of dimension in the ingestionSpec, the ingestion task should creates segments with that ordering. The ordering matters as it impacts the segment size. Since this bug will results in a final segment with lexicographic ordering despite a dimenionsSpec (with ordering of dimension) in the ingestionSpec. 
   
   **Impact 2: Roll up**. When a user specify forceGuaranteedRollup=true (with hash or single dim partitioning), the user expects perfect rollup. If a dimenionsSpec (with ordering of dimension) in the ingestionSpec is specified, that ordering is used to create the intermediate segments. However, because of this bug, the merging of all the intermediate segments would be done with a different ordering (lexicographic) assuming that the ordering in dimenionsSpec is not also lexicographic. This would result in imperfect roll up.
   
   **More detailed example + walkthrough of how the bug occurs:**
   
   Let’s say this is our input data
   
   ```
   {"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"F","metA":1}
   {"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"J","metA":1}
   {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1} **
   {"time":"2015-09-12T00:46:58.771Z","dimA":"Z","dimB":"S","metA":1}
   {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1} **
   {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"Z","metA":1}
   {"time":"2015-09-12T00:46:58.771Z","dimA":"J","dimB":"R","metA":1}
   {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"T","metA":1}
   {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1} **
   {"time":"2015-09-12T00:46:58.771Z","dimC":"A","dimB":"X","metA":1}
   ```
   The `**` denotes row that should be roll up together.
   
   Now we will ingest/compact this with `dimB, dimA, dimC` order in the dimensionsSpec. 
   
   Now lets say that the intermediate segments generated are:  
   
   ```
   Segment 1: {"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"F","metA":1}
   Segment 1: {"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"J","metA":1}
   Segment 1: {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1}
   
   Segment 2: {"time":"2015-09-12T00:46:58.771Z","dimA":"Z","dimB":"S","metA":1}
   Segment 2: {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1}
   Segment 2: {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"Z","metA":1}
   
   Segment 3: {"time":"2015-09-12T00:46:58.771Z","dimA":"J","dimB":"R","metA":1}
   Segment 3: {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"T","metA":1}
   Segment 3: {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1}
   
   Segment 4: {"time":"2015-09-12T00:46:58.771Z","dimC":"A","dimB":"X","metA":1}
   ```
   and the indexes for these intermediate segments are:  
   
   Note that the ordering of dimensions here are important and that the ordering here follow the ordering of the dimension in the dimensionsSpec of the ingestionSpec (`dimB, dimA, dimC`)
   
   intermediate segments 1: dimB, dimA
   intermediate segments 2: dimB, dimA
   intermediate segments 3: dimB, dimA
   intermediate segments 4: dimB, dimC
   
   The generated intermediate segments rows are sorted by dimension in the ordering of the  dimensionsSpec of the ingestionSpec (`dimB, dimA, dimC`). 
   
   At merging of the intermediate segments, `getLongestSharedDimOrder` returns `null` as there is no common shared dimension ordering across all 4 intermediate segments (none of the index have all dimA, dimB, and dimC). This causes the merging to be done with lexicographic ordering (`dimA, dimB, and dimC`). 
   
   Merging of the intermediate segments iterates through all the indexes in like a BFS way. Basically, it would try to get everything for a given (time, dims) tuple from across all segments before moving to the next tuple. When it moved to the next tuple, it would assume that we are now on a different time, dims tuple. Hence, it is important that the intermediate segments are sorted with the dimension ordering that is used for merging.
   
   Back to our example, since we are now merging with lexicographic ordering (dimA, dimB, and dimC), the first row to be pulled is 
   
   `{"time":"2015-09-12T00:46:58.771Z","dimC":"A","dimB":"X","metA":1}`
   
   since it first looks for the smallest value in dimA, follow by dimB, follow by dimC where null is treated as smallest across all the first rows of all the intermediate segments.
   
   then it move on to 
   
   ` {"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"F","metA":1}` (from intermediate segment 1) then
   
   `{"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"J","metA":1}` (from intermediate segment 1) then
   
   `{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1}` (from intermediate segment 1) then
   
   So far nothing is roll up as we haven’t gotten any (time, dims) tuple that are the same. We have now iterated through all the rows in intermediate segments 4 (first iteration) and intermediate segments 1 (second to fourth iterations). 
   
   Now the next row to be iterate is either the first row of intermediate segments 2 (`{"time":"2015-09-12T00:46:58.771Z","dimA":"Z","dimB":"S","metA":1}`) or the first row of intermediate segment 3 (`{"time":"2015-09-12T00:46:58.771Z","dimA":"J","dimB":"R","metA":1}`). The problem here is that it should have iterate through the rows with `"dimA":"H","dimB":"X"` in intermediate segment 2 and 3 first to be able to combine with the `"dimA":"H","dimB":"X"` already iterated from intermediate segment 1. Just to complete the example, we then iterates `{"time":"2015-09-12T00:46:58.771Z","dimA":"J","dimB":"R","metA":1} `(from intermediate segments 3) first since dimA value of J is smaller than dimA value of Z (from intermediate segments 2). Then we iterate the `{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"T","metA":1} `(from intermediate segments 3) since dimA value of H is smaller than dimA value of Z (from intermediate segments 2). However, we can no longer roll up the row (from i
 ntermediate segments 3) with the previously iterated row from intermediate segments 1 since there was the` {"time":"2015-09-12T00:46:58.771Z","dimA":"J","dimB":"R","metA":1}` in between which has dimA  != H and dimB != X values.
   
   Purpose Fix:
   
   High level: 
   We will make the dimension names given in the ingest spec available to getLongestShardeDimOrder (As an additional argument). 
   
   We should pass the `schema.getDimensionsSpec().getDimensionNames()` (which is the DimensionNames from the ingestionSpec) from within the `AppendetaorImpl#mergeAndPush` to `indexMerger.mergeQueryableIndex(...)` method. Which would then pass the DimensionNames to the `IndexMergerV9.merge` method. `IndexMergerV9.merge` would pass DimensionNames to the `IndexMerger.getMergedDimensions(indexes)` and finally, we pass it to `getLongestSharedDimOrder(...);`
   
   In `getLongestSharedDimOrder(indexes, specDimensionNames)`, we would then try to find the LongestSharedDim as usually. However, if we cannot find the LongestSharedDim, before returning null, we will do the following in sequence:
   
   1. check if `specDimensionNames` is not null and not empty. → if null or empty, we return null
   2. get the `specDimensionNames` and remove all dimensions that does not exist within the `indexes` dimension names. 
   3. check if `specDimensionNames` does not have extra / missing columns from the set of all `indexes` dimension names. → if it has extra/missing columns, we return null
   4. check that all `indexes` dimension name ordering is the same as the ordering in `specDimensionNames`. → if any index dimension name have a different ordering than `specDimensionNames` we return null.  For example, if `specDimensionNames` is A, B, C and one of the index dimension name is A, C then this is valid, however if one of the index dimension name is C, A then this is invalid and we return null 
   5. return specDimensionNames
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #10948: If ingested data has sparse columns, the ingested data with forceGuaranteedRollup=true can result in imperfect rollup and final dimension ordering can be different from dimensionSpec ordering in the ingestionSpec

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #10948:
URL: https://github.com/apache/druid/pull/10948#discussion_r596410264



##########
File path: processing/src/main/java/org/apache/druid/segment/IndexMerger.java
##########
@@ -106,6 +118,41 @@
       return null;
     }
 
+    if (isDimensionOrderingValid(indexes, orderingCandidate)) {
+      return ImmutableList.copyOf(orderingCandidate);
+    } else {
+      log.info("Indexes have incompatible dimension orders, try falling back on dimension ordering from ingestionSpec");
+      // Check if there is a valid dimension ordering in the ingestionSpec to fall back on
+      if (dimensionsSpec == null || CollectionUtils.isNullOrEmpty(dimensionsSpec.getDimensionNames())) {
+        log.info("Cannot fall back on dimension ordering from ingestionSpec as it does not exist");
+        return null;
+      }
+      List<String> candidate = dimensionsSpec.getDimensionNames();

Review comment:
       should this be a copy?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10948: If ingested data has sparse columns, the ingested data with forceGuaranteedRollup=true can result in imperfect rollup and final dimension ordering can be different from dimensionSpec ordering in the ingestionSpec

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10948:
URL: https://github.com/apache/druid/pull/10948#discussion_r597272564



##########
File path: processing/src/main/java/org/apache/druid/segment/IndexMerger.java
##########
@@ -106,6 +118,41 @@
       return null;
     }
 
+    if (isDimensionOrderingValid(indexes, orderingCandidate)) {
+      return ImmutableList.copyOf(orderingCandidate);
+    } else {
+      log.info("Indexes have incompatible dimension orders, try falling back on dimension ordering from ingestionSpec");
+      // Check if there is a valid dimension ordering in the ingestionSpec to fall back on
+      if (dimensionsSpec == null || CollectionUtils.isNullOrEmpty(dimensionsSpec.getDimensionNames())) {
+        log.info("Cannot fall back on dimension ordering from ingestionSpec as it does not exist");
+        return null;
+      }
+      List<String> candidate = dimensionsSpec.getDimensionNames();

Review comment:
       Good point. From the Guava docs, seems like "The returned list is a transformed view of fromList; changes to fromList will be reflected in the returned list and vice versa." and "To avoid lazy evaluation when the returned list doesn't need to be a view, copy the returned list into a new list of your choosing."
   
   Changed this to make a copy of the returned list from `dimensionsSpec.getDimensionNames()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm merged pull request #10948: If ingested data has sparse columns, the ingested data with forceGuaranteedRollup=true can result in imperfect rollup and final dimension ordering can be different from dimensionSpec ordering in the ingestionSpec

Posted by GitBox <gi...@apache.org>.
maytasm merged pull request #10948:
URL: https://github.com/apache/druid/pull/10948


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org