You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/08/03 05:38:47 UTC

[GitHub] [incubator-druid] jon-wei commented on a change in pull request #8222: Fix bugs in overshadowableManager and add unit tests

jon-wei commented on a change in pull request #8222: Fix bugs in overshadowableManager and add unit tests
URL: https://github.com/apache/incubator-druid/pull/8222#discussion_r310339728
 
 

 ##########
 File path: core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
 ##########
 @@ -189,89 +247,359 @@ private void transitAtomicUpdateGroupState(AtomicUpdateGroup<T> atomicUpdateGrou
   }
 
   /**
-   * Find all atomicUpdateGroups of the given state overshadowed by the given rootPartitionRange and minorVersion.
+   * Find all atomicUpdateGroups of the given state overshadowed by the minorVersion in the given rootPartitionRange.
    * The atomicUpdateGroup of a higher minorVersion can have a wider RootPartitionRange.
    * To find all atomicUpdateGroups overshadowed by the given rootPartitionRange and minorVersion,
    * we first need to find the first key contained by the given rootPartitionRange.
    * Once we find such key, then we go through the entire map until we see an atomicUpdateGroup of which
    * rootRangePartition is not contained by the given rootPartitionRange.
+   *
+   * @param rangeOfAug   the partition range to search for overshadowed groups.
+   * @param minorVersion the minor version to check overshadow relation. The found groups will have lower minor versions
+   *                     than this.
+   * @param fromState    the state to search for overshadowed groups.
+   *
+   * @return a list of found atomicUpdateGroups. It could be empty if no groups are found.
+   */
+  @VisibleForTesting
+  List<AtomicUpdateGroup<T>> findOvershadowedBy(RootPartitionRange rangeOfAug, short minorVersion, State fromState)
+  {
+    final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
+    Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = findLowestOverlappingEntry(
+        rangeOfAug,
+        stateMap,
+        true
+    );
+
+    if (current == null) {
+      return Collections.emptyList();
+    }
+
+    // Going through the map to find all entries of the RootPartitionRange contained by the given rangeOfAug.
+    // Note that RootPartitionRange of entries are always consecutive.
+    final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
+    while (current != null && rangeOfAug.overlaps(current.getKey())) {
+      if (rangeOfAug.contains(current.getKey())) {
+        // versionToGroup is sorted by minorVersion.
+        // versionToGroup.headMap(minorVersion) below returns a map containing all entries of lower minorVersions
+        // than the given minorVersion.
+        final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
+        // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized.
+        // Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called.
+        // See AbstractObjectCollection.toArray().
+        // If you see performance degradation here, probably we need to improve the below line.
+        if (versionToGroup.firstShortKey() < minorVersion) {
+          found.addAll(versionToGroup.headMap(minorVersion).values());
+        }
+      }
+      current = stateMap.higherEntry(current.getKey());
+    }
+    return found;
+  }
+
+  private List<AtomicUpdateGroup<T>> findOvershadows(AtomicUpdateGroup<T> aug, State fromState)
+  {
+    return findOvershadows(RootPartitionRange.of(aug), aug.getMinorVersion(), fromState);
+  }
+
+  /**
+   * Find all atomicUpdateGroups which overshadow others of the given minorVersion in the given rootPartitionRange.
+   * Similar to {@link #findOvershadowedBy}.
+   *
+   * Note that one atommicUpdateGroup can overshadow multiple other groups. If you're finding overshadowing
+   * atomicUpdateGroups by calling this method in a loop, the results of this method can contain duplicate groups.
+   *
+   * @param rangeOfAug   the partition range to search for overshadowing groups.
+   * @param minorVersion the minor version to check overshadow relation. The found groups will have higher minor
+   *                     versions than this.
+   * @param fromState    the state to search for overshadowed groups.
+   *
+   * @return a list of found atomicUpdateGroups. It could be empty if no groups are found.
    */
-  private List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> findOvershadowedBy(
+  @VisibleForTesting
+  List<AtomicUpdateGroup<T>> findOvershadows(RootPartitionRange rangeOfAug, short minorVersion, State fromState)
+  {
+    final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
+    Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = findLowestOverlappingEntry(
+        rangeOfAug,
+        stateMap,
+        false
+    );
+
+    if (current == null) {
+      return Collections.emptyList();
+    }
+
+    // Going through the map to find all entries of the RootPartitionRange contains the given rangeOfAug.
+    // Note that RootPartitionRange of entries are always consecutive.
+    final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
+    while (current != null && current.getKey().overlaps(rangeOfAug)) {
+      if (current.getKey().contains(rangeOfAug)) {
+        // versionToGroup is sorted by minorVersion.
+        // versionToGroup.tailMap(minorVersion) below returns a map containing all entries of equal to or higher
+        // minorVersions than the given minorVersion.
+        final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
+        // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized.
+        // Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called.
+        // See AbstractObjectCollection.toArray().
+        // If you see performance degradation here, probably we need to improve the below line.
+        if (versionToGroup.lastShortKey() > minorVersion) {
+          found.addAll(versionToGroup.tailMap(minorVersion).values());
+        }
+      }
+      current = stateMap.higherEntry(current.getKey());
+    }
+    return found;
+  }
+
+  private Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> findLowestOverlappingEntry(
       RootPartitionRange rangeOfAug,
-      short minorVersion,
-      State fromState
+      TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap,
+      boolean strictSameStartId
   )
   {
-    final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
     Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = stateMap.floorEntry(rangeOfAug);
 
     if (current == null) {
-      return Collections.emptyList();
+      current = stateMap.ceilingEntry(rangeOfAug);
+    }
+
+    if (current == null) {
+      return null;
+    }
+
+    // floorEntry() can return the greatest key less than rangeOfAug. We need to skip non-overlapping keys.
+    while (current != null && !current.getKey().overlaps(rangeOfAug)) {
+      current = stateMap.higherEntry(current.getKey());
+    }
+
+    final BiPredicate<RootPartitionRange, RootPartitionRange> predicate;
+    if (strictSameStartId) {
+      predicate = (entryRange, groupRange) -> entryRange.startPartitionId == groupRange.startPartitionId;
+    } else {
+      predicate = RootPartitionRange::overlaps;
     }
 
-    // Find the first key for searching for overshadowed atomicUpdateGroup
-    while (true) {
+    // There could be multiple entries of the same startPartitionId but different endPartitionId.
+    // Find the first key of the same startPartitionId which has the lowest endPartitionId.
+    while (current != null) {
       final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> lowerEntry = stateMap.lowerEntry(
           current.getKey()
       );
-      if (lowerEntry != null && lowerEntry.getKey().startPartitionId == rangeOfAug.startPartitionId) {
+      if (lowerEntry != null && predicate.test(lowerEntry.getKey(), rangeOfAug)) {
         current = lowerEntry;
       } else {
         break;
       }
     }
 
-    // Going through the map to find all entries of the RootPartitionRange contained by the given rangeOfAug.
-    // Note that RootPartitionRange of entries are always consecutive.
-    final List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> found = new ArrayList<>();
-    while (current != null && rangeOfAug.contains(current.getKey())) {
-      // versionToGroup is sorted by minorVersion.
-      // versionToGroup.subMap(firstKey, minorVersion) below returns a map containing all entries of lower minorVersions
-      // than the given minorVersion.
-      final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
-      // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized.
-      // Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called.
-      // See AbstractObjectCollection.toArray().
-      // If you see performance degradation here, probably we need to improve the below line.
-      found.addAll(versionToGroup.subMap(versionToGroup.firstShortKey(), minorVersion).short2ObjectEntrySet());
-      current = stateMap.higherEntry(current.getKey());
-    }
-    return found;
+    return current;
   }
 
   /**
-   * Handles addition of the atomicUpdateGroup to the given state
+   * Determine the visible group after a new chunk is added.
    */
-  private void transitionStandbyGroupIfFull(AtomicUpdateGroup<T> aug, State stateOfAug)
+  private void determineVisibleGroupAfterAdd(AtomicUpdateGroup<T> aug, State stateOfAug)
   {
     if (stateOfAug == State.STANDBY) {
       // A standby atomicUpdateGroup becomes visible when its all segments are available.
       if (aug.isFull()) {
         // A visible atomicUpdateGroup becomes overshadowed when a fully available standby atomicUpdateGroup becomes
         // visible which overshadows the current visible one.
-        findOvershadowedBy(aug, State.VISIBLE)
-            .forEach(entry -> transitAtomicUpdateGroupState(entry.getValue(), State.VISIBLE, State.OVERSHADOWED));
+        replaceVisibleWith(
+            findOvershadowedBy(aug, State.VISIBLE),
+            State.OVERSHADOWED,
+            Collections.singletonList(aug),
+            State.STANDBY
+        );
         findOvershadowedBy(aug, State.STANDBY)
-            .forEach(entry -> transitAtomicUpdateGroupState(entry.getValue(), State.STANDBY, State.OVERSHADOWED));
-        transitAtomicUpdateGroupState(aug, State.STANDBY, State.VISIBLE);
+            .forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED));
+      } else {
+        // The given atomicUpdateGroup is in the standby state which means it's not overshadowed by the visible group.
+        // If the visible group is not fully available, then the new standby group should be visible since it has a
+        // higher minor version.
+        checkVisibleIsFullyAvailableAndMoveNewStandbyToVisible(aug, stateOfAug);
       }
+    } else if (stateOfAug == State.OVERSHADOWED) {
+      checkVisibleIsFullyAvailableAndTryToMoveOvershadowedToVisible(aug, stateOfAug);
     }
   }
 
-  private void addAtomicUpdateGroupWithState(AtomicUpdateGroup<T> aug, State state)
+  /**
+   * This method is called in {@link #determineVisibleGroupAfterAdd}. It checks the current visible group is
+   * fully available, and if not, moves the given group to the visible state.
+   */
+  private void checkVisibleIsFullyAvailableAndMoveNewStandbyToVisible(
+      AtomicUpdateGroup<T> standbyGroup,
+      State stateOfGroup
+  )
+  {
+    assert stateOfGroup == State.STANDBY;
+    if (!standbyGroup.isEmpty()) {
+      // Check there are visible atomicUpdateGroups overshadowed by the given atomicUpdateGroup.
+      final List<AtomicUpdateGroup<T>> overshadowedVisibles = findOvershadowedBy(
+          standbyGroup,
+          State.VISIBLE
+      );
+      if (overshadowedVisibles.isEmpty()) {
+        // There is no visible atomicUpdateGroup for the rootPartitionRange of the given aug.
+        // The given aug should be visible.
+        transitAtomicUpdateGroupState(standbyGroup, State.STANDBY, State.VISIBLE);
+        findOvershadowedBy(standbyGroup, State.STANDBY)
+            .forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED));
+      } else {
+        // Check there is any missing chunk in the current visible groups.
+        // If the current visible groups don't cover the partitino range of the given standby group,
+        // the given standby group should be visible.
+        final boolean fullyCoverAugRange = doGroupsFullyCoverPartitionRange(
+            overshadowedVisibles,
+            standbyGroup.getStartRootPartitionId(),
+            standbyGroup.getEndRootPartitionId()
+        );
+        if (!fullyCoverAugRange) {
+          replaceVisibleWith(
+              overshadowedVisibles,
+              State.OVERSHADOWED,
+              Collections.singletonList(standbyGroup),
+              State.STANDBY
+          );
+          findOvershadowedBy(standbyGroup, State.STANDBY)
+              .forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED));
+
+        }
+        // If all visible atomicUpdateGroups are full, then the given atomicUpdateGroup should stay in the standby
+        // state.
+      }
+    }
+  }
+
+  /**
+   * This method is called in {@link #determineVisibleGroupAfterAdd}. It first hecks the current visible group is
+   * fully available. If not, it checkes there are overshadowed groups which can cover the rootPartitionRange of
 
 Review comment:
   checkes -> checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org