You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/08/07 20:51:31 UTC

[incubator-druid] branch master updated: Fix bugs in overshadowableManager and add unit tests (#8222)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fa114c  Fix bugs in overshadowableManager and add unit tests (#8222)
8fa114c is described below

commit 8fa114c34965e4ad19a3782b60f19ee2036ccb5a
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Wed Aug 7 13:51:21 2019 -0700

    Fix bugs in overshadowableManager and add unit tests (#8222)
    
    * Fix bugs in overshadowableManager and add unit tests
    
    * Fix SegmentManager
    
    * add segment manager test
    
    * Address comments
    
    * Address comments
---
 .../timeline/partition/AtomicUpdateGroup.java      |    6 +-
 .../timeline/partition/OvershadowableManager.java  |  647 ++++++++++--
 .../druid/timeline/partition/PartitionHolder.java  |    6 +-
 .../timeline/VersionedIntervalTimelineTest.java    |  114 +--
 .../partition/IntegerPartitionChunkTest.java       |   41 -
 .../timeline/partition/OvershadowableInteger.java  |  141 +++
 .../partition/OvershadowableManagerTest.java       | 1048 ++++++++++++++++++++
 .../druid/segment/ReferenceCountingSegment.java    |   21 +-
 .../query/select/MultiSegmentSelectQueryTest.java  |   18 +-
 .../timeboundary/TimeBoundaryQueryRunnerTest.java  |   12 +-
 .../segment/ReferenceCountingSegmentTest.java      |    2 +-
 .../apache/druid/segment/realtime/FireHydrant.java |    8 +-
 .../org/apache/druid/server/SegmentManager.java    |   15 +-
 .../apache/druid/server/SegmentManagerTest.java    |   34 +-
 .../util/SpecificSegmentsQuerySegmentWalker.java   |    2 +-
 15 files changed, 1832 insertions(+), 283 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/timeline/partition/AtomicUpdateGroup.java b/core/src/main/java/org/apache/druid/timeline/partition/AtomicUpdateGroup.java
index aed7f15..1386c14 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/AtomicUpdateGroup.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/AtomicUpdateGroup.java
@@ -45,7 +45,7 @@ class AtomicUpdateGroup<T extends Overshadowable<T>> implements Overshadowable<A
   // This may matter if there are a lot of segments to keep in memory as in brokers or the coordinator.
   private final List<PartitionChunk<T>> chunks = new ArrayList<>();
 
-  public AtomicUpdateGroup(PartitionChunk<T> chunk)
+  AtomicUpdateGroup(PartitionChunk<T> chunk)
   {
     this.chunks.add(chunk);
   }
@@ -60,7 +60,7 @@ class AtomicUpdateGroup<T extends Overshadowable<T>> implements Overshadowable<A
     }
     for (PartitionChunk<T> existing : chunks) {
       if (existing.equals(chunk)) {
-        return;
+        throw new ISE("Can't add same chunk[%s] again", chunk);
       }
     }
     chunks.add(chunk);
@@ -95,7 +95,7 @@ class AtomicUpdateGroup<T extends Overshadowable<T>> implements Overshadowable<A
   }
 
   @Nullable
-  public PartitionChunk<T> findChunk(int partitionId)
+  PartitionChunk<T> findChunk(int partitionId)
   {
     return chunks.stream().filter(chunk -> chunk.getChunkNumber() == partitionId).findFirst().orElse(null);
   }
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java b/core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
index 97e8b9a..d3f090f 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.timeline.partition;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import it.unimi.dsi.fastutil.objects.AbstractObjectCollection;
@@ -41,6 +42,7 @@ import org.apache.druid.timeline.Overshadowable;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -48,11 +50,13 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
 
 /**
- * OvershadowableManager manages the state of {@link AtomicUpdateGroup}. See the below {@link State} for details of
- * the possible state.
+ * OvershadowableManager manages the state of {@link AtomicUpdateGroup}. See the below {@link State} for details.
  * Note that an AtomicUpdateGroup can consist of {@link Overshadowable}s of the same majorVersion, minorVersion,
  * rootPartition range, and atomicUpdateGroupSize.
  * In {@link org.apache.druid.timeline.VersionedIntervalTimeline}, this class is used to manage segments in the same
@@ -62,11 +66,22 @@ import java.util.TreeMap;
  */
 class OvershadowableManager<T extends Overshadowable<T>>
 {
-  private enum State
+  /**
+   * There are 3 states for atomicUpdateGroups.
+   * There could be at most one visible atomicUpdateGroup at any time in a non-empty overshadowableManager.
+   *
+   * - Visible: fully available atomicUpdateGroup of the highest version if any.
+   *            If there's no fully available atomicUpdateGroup, the standby atomicUpdateGroup of the highest version
+   *            becomes visible.
+   * - Standby: all atomicUpdateGroups of higher versions than that of the visible atomicUpdateGroup.
+   * - Overshadowed: all atomicUpdateGroups of lower versions than that of the visible atomicUpdateGroup.
+   */
+  @VisibleForTesting
+  enum State
   {
-    STANDBY, // have atomicUpdateGroup of higher versions than visible
-    VISIBLE, // have a single fully available atomicUpdateGroup of highest version
-    OVERSHADOWED // have atomicUpdateGroup of lower versions than visible
+    STANDBY,
+    VISIBLE,
+    OVERSHADOWED
   }
 
   private final Map<Integer, PartitionChunk<T>> knownPartitionChunks; // served segments
@@ -92,6 +107,16 @@ class OvershadowableManager<T extends Overshadowable<T>>
     this.overshadowedGroups = new TreeMap<>(other.overshadowedGroups);
   }
 
+  private OvershadowableManager(List<AtomicUpdateGroup<T>> groups)
+  {
+    this();
+    for (AtomicUpdateGroup<T> entry : groups) {
+      for (PartitionChunk<T> chunk : entry.getChunks()) {
+        addChunk(chunk);
+      }
+    }
+  }
+
   private TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> getStateMap(State state)
   {
     switch (state) {
@@ -125,7 +150,39 @@ class OvershadowableManager<T extends Overshadowable<T>>
     Preconditions.checkArgument(!atomicUpdateGroup.isEmpty(), "empty atomicUpdateGroup");
 
     removeFrom(atomicUpdateGroup, from);
-    addAtomicUpdateGroupWithState(atomicUpdateGroup, to);
+    addAtomicUpdateGroupWithState(atomicUpdateGroup, to, false);
+  }
+
+  /**
+   * Replace the oldVisibleGroups with the newVisibleGroups.
+   * This method first removes the oldVisibleGroups from the visibles map,
+   * moves the newVisibleGroups from its old state map to the visibles map,
+   * and finally add the oldVisibleGroups to its new state map.
+   */
+  private void replaceVisibleWith(
+      Collection<AtomicUpdateGroup<T>> oldVisibleGroups,
+      State newStateOfOldVisibleGroup,
+      List<AtomicUpdateGroup<T>> newVisibleGroups,
+      State oldStateOfNewVisibleGroups
+  )
+  {
+    oldVisibleGroups.forEach(
+        group -> {
+          if (!group.isEmpty()) {
+            removeFrom(group, State.VISIBLE);
+          }
+        }
+    );
+    newVisibleGroups.forEach(
+        entry -> transitAtomicUpdateGroupState(entry, oldStateOfNewVisibleGroups, State.VISIBLE)
+    );
+    oldVisibleGroups.forEach(
+        group -> {
+          if (!group.isEmpty()) {
+            addAtomicUpdateGroupWithState(group, newStateOfOldVisibleGroup, false);
+          }
+        }
+    );
   }
 
   /**
@@ -179,7 +236,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
     return null;
   }
 
-  private List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> findOvershadowedBy(
+  private List<AtomicUpdateGroup<T>> findOvershadowedBy(
       AtomicUpdateGroup<T> aug,
       State fromState
   )
@@ -189,89 +246,372 @@ class OvershadowableManager<T extends Overshadowable<T>>
   }
 
   /**
-   * Find all atomicUpdateGroups of the given state overshadowed by the given rootPartitionRange and minorVersion.
+   * Find all atomicUpdateGroups of the given state overshadowed by the minorVersion in the given rootPartitionRange.
    * The atomicUpdateGroup of a higher minorVersion can have a wider RootPartitionRange.
    * To find all atomicUpdateGroups overshadowed by the given rootPartitionRange and minorVersion,
    * we first need to find the first key contained by the given rootPartitionRange.
    * Once we find such key, then we go through the entire map until we see an atomicUpdateGroup of which
    * rootRangePartition is not contained by the given rootPartitionRange.
+   *
+   * @param rangeOfAug   the partition range to search for overshadowed groups.
+   * @param minorVersion the minor version to check overshadow relation. The found groups will have lower minor versions
+   *                     than this.
+   * @param fromState    the state to search for overshadowed groups.
+   *
+   * @return a list of found atomicUpdateGroups. It could be empty if no groups are found.
+   */
+  @VisibleForTesting
+  List<AtomicUpdateGroup<T>> findOvershadowedBy(RootPartitionRange rangeOfAug, short minorVersion, State fromState)
+  {
+    final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
+    Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = findLowestOverlappingEntry(
+        rangeOfAug,
+        stateMap,
+        true
+    );
+
+    if (current == null) {
+      return Collections.emptyList();
+    }
+
+    // Going through the map to find all entries of the RootPartitionRange contained by the given rangeOfAug.
+    // Note that RootPartitionRange of entries are always consecutive.
+    final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
+    while (current != null && rangeOfAug.overlaps(current.getKey())) {
+      if (rangeOfAug.contains(current.getKey())) {
+        // versionToGroup is sorted by minorVersion.
+        // versionToGroup.headMap(minorVersion) below returns a map containing all entries of lower minorVersions
+        // than the given minorVersion.
+        final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
+        // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized.
+        // Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called.
+        // See AbstractObjectCollection.toArray().
+        // If you see performance degradation here, probably we need to improve the below line.
+        if (versionToGroup.firstShortKey() < minorVersion) {
+          found.addAll(versionToGroup.headMap(minorVersion).values());
+        }
+      }
+      current = stateMap.higherEntry(current.getKey());
+    }
+    return found;
+  }
+
+  private List<AtomicUpdateGroup<T>> findOvershadows(AtomicUpdateGroup<T> aug, State fromState)
+  {
+    return findOvershadows(RootPartitionRange.of(aug), aug.getMinorVersion(), fromState);
+  }
+
+  /**
+   * Find all atomicUpdateGroups which overshadow others of the given minorVersion in the given rootPartitionRange.
+   * Similar to {@link #findOvershadowedBy}.
+   *
+   * Note that one atomicUpdateGroup can overshadow multiple other groups. If you're finding overshadowing
+   * atomicUpdateGroups by calling this method in a loop, the results of this method can contain duplicate groups.
+   *
+   * @param rangeOfAug   the partition range to search for overshadowing groups.
+   * @param minorVersion the minor version to check overshadow relation. The found groups will have higher minor
+   *                     versions than this.
+   * @param fromState    the state to search for overshadowed groups.
+   *
+   * @return a list of found atomicUpdateGroups. It could be empty if no groups are found.
+   */
+  @VisibleForTesting
+  List<AtomicUpdateGroup<T>> findOvershadows(RootPartitionRange rangeOfAug, short minorVersion, State fromState)
+  {
+    final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
+    Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = findLowestOverlappingEntry(
+        rangeOfAug,
+        stateMap,
+        false
+    );
+
+    if (current == null) {
+      return Collections.emptyList();
+    }
+
+    // Going through the map to find all entries of the RootPartitionRange contains the given rangeOfAug.
+    // Note that RootPartitionRange of entries are always consecutive.
+    final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
+    while (current != null && current.getKey().overlaps(rangeOfAug)) {
+      if (current.getKey().contains(rangeOfAug)) {
+        // versionToGroup is sorted by minorVersion.
+        // versionToGroup.tailMap(minorVersion) below returns a map containing all entries of equal to or higher
+        // minorVersions than the given minorVersion.
+        final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
+        // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized.
+        // Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called.
+        // See AbstractObjectCollection.toArray().
+        // If you see performance degradation here, probably we need to improve the below line.
+        if (versionToGroup.lastShortKey() > minorVersion) {
+          found.addAll(versionToGroup.tailMap(minorVersion).values());
+        }
+      }
+      current = stateMap.higherEntry(current.getKey());
+    }
+    return found;
+  }
+
+  /**
+   * Finds the lowest entry overlapping with the given root partition range.
+   * It first searches the entries lower than or equal to the given range.
+   * If there's no such entry lower than the given range, then it searches the entries higher than the given range.
+   *
+   * @return an entry of the lowest key overlapping with the given range. Otherwise null.
    */
-  private List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> findOvershadowedBy(
+  @Nullable
+  private Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> findLowestOverlappingEntry(
       RootPartitionRange rangeOfAug,
-      short minorVersion,
-      State fromState
+      TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap,
+      boolean strictSameStartId
   )
   {
-    final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
+    // Searches the entries lower than or equal to the given range.
     Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = stateMap.floorEntry(rangeOfAug);
 
     if (current == null) {
-      return Collections.emptyList();
+      // Searches the entries higher than then given range.
+      current = stateMap.higherEntry(rangeOfAug);
+    }
+
+    if (current == null) {
+      return null;
+    }
+
+    // floorEntry() can return the greatest key less than rangeOfAug. We need to skip non-overlapping keys.
+    while (current != null && !current.getKey().overlaps(rangeOfAug)) {
+      current = stateMap.higherEntry(current.getKey());
+    }
+
+    final BiPredicate<RootPartitionRange, RootPartitionRange> predicate;
+    if (strictSameStartId) {
+      predicate = (entryRange, groupRange) -> entryRange.startPartitionId == groupRange.startPartitionId;
+    } else {
+      predicate = RootPartitionRange::overlaps;
     }
 
-    // Find the first key for searching for overshadowed atomicUpdateGroup
-    while (true) {
+    // There could be multiple entries of the same startPartitionId but different endPartitionId.
+    // Find the first key of the same startPartitionId which has the lowest endPartitionId.
+    while (current != null) {
       final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> lowerEntry = stateMap.lowerEntry(
           current.getKey()
       );
-      if (lowerEntry != null && lowerEntry.getKey().startPartitionId == rangeOfAug.startPartitionId) {
+      if (lowerEntry != null && predicate.test(lowerEntry.getKey(), rangeOfAug)) {
         current = lowerEntry;
       } else {
         break;
       }
     }
 
-    // Going through the map to find all entries of the RootPartitionRange contained by the given rangeOfAug.
-    // Note that RootPartitionRange of entries are always consecutive.
-    final List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> found = new ArrayList<>();
-    while (current != null && rangeOfAug.contains(current.getKey())) {
-      // versionToGroup is sorted by minorVersion.
-      // versionToGroup.subMap(firstKey, minorVersion) below returns a map containing all entries of lower minorVersions
-      // than the given minorVersion.
-      final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
-      // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized.
-      // Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called.
-      // See AbstractObjectCollection.toArray().
-      // If you see performance degradation here, probably we need to improve the below line.
-      found.addAll(versionToGroup.subMap(versionToGroup.firstShortKey(), minorVersion).short2ObjectEntrySet());
-      current = stateMap.higherEntry(current.getKey());
-    }
-    return found;
+    return current;
   }
 
   /**
-   * Handles addition of the atomicUpdateGroup to the given state
+   * Determine the visible group after a new chunk is added.
    */
-  private void transitionStandbyGroupIfFull(AtomicUpdateGroup<T> aug, State stateOfAug)
+  private void determineVisibleGroupAfterAdd(AtomicUpdateGroup<T> aug, State stateOfAug)
   {
     if (stateOfAug == State.STANDBY) {
-      // A standby atomicUpdateGroup becomes visible when its all segments are available.
-      if (aug.isFull()) {
-        // A visible atomicUpdateGroup becomes overshadowed when a fully available standby atomicUpdateGroup becomes
-        // visible which overshadows the current visible one.
-        findOvershadowedBy(aug, State.VISIBLE)
-            .forEach(entry -> transitAtomicUpdateGroupState(entry.getValue(), State.VISIBLE, State.OVERSHADOWED));
-        findOvershadowedBy(aug, State.STANDBY)
-            .forEach(entry -> transitAtomicUpdateGroupState(entry.getValue(), State.STANDBY, State.OVERSHADOWED));
-        transitAtomicUpdateGroupState(aug, State.STANDBY, State.VISIBLE);
+      moveNewStandbyToVisibleIfNecessary(aug, stateOfAug);
+    } else if (stateOfAug == State.OVERSHADOWED) {
+      checkVisibleIsFullyAvailableAndTryToMoveOvershadowedToVisible(aug, stateOfAug);
+    }
+  }
+
+  /**
+   * This method is called in {@link #determineVisibleGroupAfterAdd}.
+   * The given standby group can be visible in the below two cases:
+   *
+   * - The standby group is full. Since every standby group has a higher version than the current visible group,
+   *   it should become visible immediately when it's full.
+   * - The standby group is not full but not empty and the current visible is not full. If there's no fully available
+   *   group, the group of the highest version should be the visible.
+   */
+  private void moveNewStandbyToVisibleIfNecessary(AtomicUpdateGroup<T> standbyGroup, State stateOfGroup)
+  {
+    assert stateOfGroup == State.STANDBY;
+
+    // A standby atomicUpdateGroup becomes visible when its all segments are available.
+    if (standbyGroup.isFull()) {
+      // A current visible atomicUpdateGroup becomes overshadowed when a fully available standby atomicUpdateGroup
+      // becomes visible.
+      replaceVisibleWith(
+          findOvershadowedBy(standbyGroup, State.VISIBLE),
+          State.OVERSHADOWED,
+          Collections.singletonList(standbyGroup),
+          State.STANDBY
+      );
+      findOvershadowedBy(standbyGroup, State.STANDBY)
+          .forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED));
+    } else {
+      // The given atomicUpdateGroup is in the standby state which means it's not overshadowed by the visible group.
+      // If the visible group is not fully available, then the new standby group should be visible since it has a
+      // higher minor version.
+      if (!standbyGroup.isEmpty()) {
+        // Check there are visible atomicUpdateGroups overshadowed by the given atomicUpdateGroup.
+        final List<AtomicUpdateGroup<T>> overshadowedVisibles = findOvershadowedBy(
+            standbyGroup,
+            State.VISIBLE
+        );
+        if (overshadowedVisibles.isEmpty()) {
+          // There is no visible atomicUpdateGroup for the rootPartitionRange of the given aug.
+          // The given aug should be visible.
+          transitAtomicUpdateGroupState(standbyGroup, State.STANDBY, State.VISIBLE);
+          findOvershadowedBy(standbyGroup, State.STANDBY)
+              .forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED));
+        } else {
+          // Check there is any missing chunk in the current visible groups.
+          // If the current visible groups don't cover the partitino range of the given standby group,
+          // the given standby group should be visible.
+          final boolean fullyCoverAugRange = doGroupsFullyCoverPartitionRange(
+              overshadowedVisibles,
+              standbyGroup.getStartRootPartitionId(),
+              standbyGroup.getEndRootPartitionId()
+          );
+          if (!fullyCoverAugRange) {
+            replaceVisibleWith(
+                overshadowedVisibles,
+                State.OVERSHADOWED,
+                Collections.singletonList(standbyGroup),
+                State.STANDBY
+            );
+            findOvershadowedBy(standbyGroup, State.STANDBY)
+                .forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED));
+
+          }
+          // If all visible atomicUpdateGroups are full, then the given atomicUpdateGroup should stay in the standby
+          // state.
+        }
+      }
+    }
+  }
+
+  /**
+   * This method is called in {@link #determineVisibleGroupAfterAdd}. It first checks the current visible group is
+   * fully available. If not, it checks there are overshadowed groups which can cover the rootPartitionRange of
+   * the visible groups and are fully available. If it finds such groups, they become visible.
+   */
+  private void checkVisibleIsFullyAvailableAndTryToMoveOvershadowedToVisible(
+      AtomicUpdateGroup<T> group,
+      State stateOfGroup
+  )
+  {
+    assert stateOfGroup == State.OVERSHADOWED;
+    if (group.isFull()) {
+      // Since this atomicUpdateGroup is full, it could be changed to visible if the current visible group is not
+      // fully available. To check this, we first check the current visible is fully available.
+      // And if not, we check the overshadowed groups are fully available and can cover the partition range of
+      // the atomicUpdateGroups overshadow the given overshadowed group.
+
+      // Visible or standby groups overshadowing the given group.
+      // Used to both 1) check fully available visible group and
+      // 2) get the partition range which the fully available overshadowed groups should cover to become visible.
+      final List<AtomicUpdateGroup<T>> groupsOvershadowingAug;
+      final boolean isOvershadowingGroupsFull;
+
+      final List<AtomicUpdateGroup<T>> overshadowingVisibles = findOvershadows(group, State.VISIBLE);
+      if (overshadowingVisibles.isEmpty()) {
+        final List<AtomicUpdateGroup<T>> overshadowingStandbys = findLatestNonFullyAvailableAtomicUpdateGroups(
+            findOvershadows(group, State.STANDBY)
+        );
+        if (overshadowingStandbys.isEmpty()) {
+          throw new ISE("WTH? atomicUpdateGroup[%s] is in overshadowed state, but no one overshadows it?", group);
+        }
+        groupsOvershadowingAug = overshadowingStandbys;
+        isOvershadowingGroupsFull = false;
+      } else {
+        groupsOvershadowingAug = overshadowingVisibles;
+        isOvershadowingGroupsFull = doGroupsFullyCoverPartitionRange(
+            groupsOvershadowingAug,
+            groupsOvershadowingAug.get(0).getStartRootPartitionId(),
+            groupsOvershadowingAug.get(groupsOvershadowingAug.size() - 1).getEndRootPartitionId()
+        );
+      }
+
+      // If groupsOvershadowingAug is the standby groups, isOvershadowingGroupsFull is always false.
+      // If groupsOvershadowingAug is the visible groups, isOvershadowingGroupsFull indicates the visible group is
+      // fully available or not.
+      if (!isOvershadowingGroupsFull) {
+        // Let's check the overshadowed groups can cover the partition range of groupsOvershadowingAug
+        // and are fully available.
+        final List<AtomicUpdateGroup<T>> latestFullGroups = groupsOvershadowingAug
+            .stream()
+            .flatMap(eachFullgroup -> findLatestFullyAvailableOvershadowedAtomicUpdateGroups(
+                RootPartitionRange.of(eachFullgroup),
+                eachFullgroup.getMinorVersion()).stream()
+            )
+            .collect(Collectors.toList());
+
+        if (!latestFullGroups.isEmpty()) {
+          final boolean isOvershadowedGroupsFull = doGroupsFullyCoverPartitionRange(
+              latestFullGroups,
+              groupsOvershadowingAug.get(0).getStartRootPartitionId(),
+              groupsOvershadowingAug.get(groupsOvershadowingAug.size() - 1).getEndRootPartitionId()
+          );
+
+          if (isOvershadowedGroupsFull) {
+            replaceVisibleWith(overshadowingVisibles, State.STANDBY, latestFullGroups, State.OVERSHADOWED);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Checks if the given groups fully cover the given partition range. To fully cover the range, the given groups
+   * should satisfy the below:
+   *
+   * - All groups must be full.
+   * - All groups must be adjacent.
+   * - The lowest startPartitionId and the highest endPartitionId must be same with the given startPartitionId and
+   *   the given endPartitionId, respectively.
+   *
+   * @param groups               atomicUpdateGroups sorted by their rootPartitionRange
+   * @param startRootPartitionId the start partitionId of the root partition range to check the coverage
+   * @param endRootPartitionId   the end partitionId of the root partition range to check the coverage
+   *
+   * @return true if the given groups fully cover the given partition range.
+   */
+  private boolean doGroupsFullyCoverPartitionRange(
+      List<AtomicUpdateGroup<T>> groups,
+      int startRootPartitionId,
+      int endRootPartitionId
+  )
+  {
+    final int startRootPartitionIdOfOvershadowed = groups.get(0).getStartRootPartitionId();
+    final int endRootPartitionIdOfOvershadowed = groups.get(groups.size() - 1).getEndRootPartitionId();
+    if (startRootPartitionId != startRootPartitionIdOfOvershadowed
+        || endRootPartitionId != endRootPartitionIdOfOvershadowed) {
+      return false;
+    } else {
+      int prevEndPartitionId = groups.get(0).getStartRootPartitionId();
+      for (AtomicUpdateGroup<T> group : groups) {
+        if (!group.isFull() || prevEndPartitionId != group.getStartRootPartitionId()) {
+          // If any visible atomicUpdateGroup overshadowed by the given standby atomicUpdateGroup is not full,
+          // then the given atomicUpdateGroup should be visible since it has a higher version.
+          return false;
+        }
+        prevEndPartitionId = group.getEndRootPartitionId();
       }
     }
+    return true;
   }
 
-  private void addAtomicUpdateGroupWithState(AtomicUpdateGroup<T> aug, State state)
+  private void addAtomicUpdateGroupWithState(AtomicUpdateGroup<T> aug, State state, boolean determineVisible)
   {
     final AtomicUpdateGroup<T> existing = getStateMap(state)
         .computeIfAbsent(RootPartitionRange.of(aug), k -> createMinorVersionToAugMap(state))
         .put(aug.getMinorVersion(), aug);
 
     if (existing != null) {
-      throw new ISE("AtomicUpdateGroup[%s] is already in state[%s]", aug, state);
+      throw new ISE("AtomicUpdateGroup[%s] is already in state[%s]", existing, state);
     }
 
-    transitionStandbyGroupIfFull(aug, state);
+    if (determineVisible) {
+      determineVisibleGroupAfterAdd(aug, state);
+    }
   }
 
-  public boolean addChunk(PartitionChunk<T> chunk)
+  boolean addChunk(PartitionChunk<T> chunk)
   {
     // Sanity check. ExistingChunk should be usually null.
     final PartitionChunk<T> existingChunk = knownPartitionChunks.put(chunk.getChunkNumber(), chunk);
@@ -295,17 +635,36 @@ class OvershadowableManager<T extends Overshadowable<T>>
 
     if (atomicUpdateGroup != null) {
       atomicUpdateGroup.add(chunk);
+      // If overshadowed atomicUpdateGroup is full and visible atomicUpdateGroup is not full,
+      // move overshadowed one to visible.
+      determineVisibleGroupAfterAdd(atomicUpdateGroup, State.OVERSHADOWED);
     } else {
       atomicUpdateGroup = findAtomicUpdateGroupWith(chunk, State.STANDBY);
 
       if (atomicUpdateGroup != null) {
         atomicUpdateGroup.add(chunk);
-        transitionStandbyGroupIfFull(atomicUpdateGroup, State.STANDBY);
+        determineVisibleGroupAfterAdd(atomicUpdateGroup, State.STANDBY);
       } else {
         atomicUpdateGroup = findAtomicUpdateGroupWith(chunk, State.VISIBLE);
 
         if (atomicUpdateGroup != null) {
-          atomicUpdateGroup.add(chunk);
+          if (atomicUpdateGroup.findChunk(chunk.getChunkNumber()) == null) {
+            // If this chunk is not in the atomicUpdateGroup, then we add the chunk to it if it's not full.
+            if (!atomicUpdateGroup.isFull()) {
+              atomicUpdateGroup.add(chunk);
+            } else {
+              throw new ISE("Can't add chunk[%s] to a full atomicUpdateGroup[%s]", chunk, atomicUpdateGroup);
+            }
+          } else {
+            // If this chunk is already in the atomicUpdateGroup, it should be in knownPartitionChunks
+            // and this code must not be executed.
+            throw new ISE(
+                "WTH? chunk[%s] is in the atomicUpdateGroup[%s] but not in knownPartitionChunks[%s]?",
+                chunk,
+                atomicUpdateGroup,
+                knownPartitionChunks
+            );
+          }
         } else {
           final AtomicUpdateGroup<T> newAtomicUpdateGroup = new AtomicUpdateGroup<>(chunk);
 
@@ -317,9 +676,9 @@ class OvershadowableManager<T extends Overshadowable<T>>
               .anyMatch(group -> group.overshadows(newAtomicUpdateGroup));
 
           if (overshadowed) {
-            addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.OVERSHADOWED);
+            addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.OVERSHADOWED, true);
           } else {
-            addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.STANDBY);
+            addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.STANDBY, true);
           }
         }
       }
@@ -328,7 +687,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
   }
 
   /**
-   * Handles of removal of an empty atomicUpdateGroup from a state.
+   * Handles the removal of an empty atomicUpdateGroup from a state.
    */
   private void determineVisibleGroupAfterRemove(
       AtomicUpdateGroup<T> augOfRemovedChunk,
@@ -343,33 +702,101 @@ class OvershadowableManager<T extends Overshadowable<T>>
     // is removed.
 
     if (stateOfRemovedAug == State.VISIBLE) {
-      // All segments in the visible atomicUpdateGroup which overshadows this atomicUpdateGroup is removed.
-      // Fall back if there is a fully available overshadowed atomicUpdateGroup
-
-      final List<AtomicUpdateGroup<T>> latestFullAugs = findLatestFullyAvailableOvershadowedAtomicUpdateGroup(
+      // A chunk is removed from the current visible group.
+      // Fall back to
+      //   1) the latest fully available overshadowed group if any
+      //   2) the latest standby group if any
+      //   3) the latest overshadowed group if any
+
+      // Check there is a fully available latest overshadowed atomicUpdateGroup.
+      final List<AtomicUpdateGroup<T>> latestFullAugs = findLatestFullyAvailableOvershadowedAtomicUpdateGroups(
           rangeOfAug,
           minorVersion
       );
 
-      // If there is no fully available fallback group, then the existing VISIBLE group remains VISIBLE.
-      // Otherwise, the latest fully available group becomes VISIBLE.
+      // If there are fully available overshadowed groups, then the latest one becomes visible.
       if (!latestFullAugs.isEmpty()) {
-        // Move the atomicUpdateGroup to standby
-        // and move the fully available overshadowed atomicUpdateGroup to visible
-        if (!augOfRemovedChunk.isEmpty()) {
-          transitAtomicUpdateGroupState(augOfRemovedChunk, State.VISIBLE, State.STANDBY);
+        // The current visible atomicUpdateGroup becomes standby
+        // and the fully available overshadowed atomicUpdateGroups become visible
+        final Set<AtomicUpdateGroup<T>> overshadowsLatestFullAugsInVisible = latestFullAugs
+            .stream()
+            .flatMap(group -> findOvershadows(group, State.VISIBLE).stream())
+            .collect(Collectors.toSet());
+        replaceVisibleWith(
+            overshadowsLatestFullAugsInVisible,
+            State.STANDBY,
+            latestFullAugs,
+            State.OVERSHADOWED
+        );
+        latestFullAugs
+            .stream()
+            .flatMap(group -> findOvershadows(group, State.OVERSHADOWED).stream())
+            .collect(Collectors.toSet())
+            .forEach(group -> transitAtomicUpdateGroupState(group, State.OVERSHADOWED, State.STANDBY));
+      } else {
+        // Find the latest non-fully available atomicUpdateGroups
+        final List<AtomicUpdateGroup<T>> latestStandby = findLatestNonFullyAvailableAtomicUpdateGroups(
+            findOvershadows(rangeOfAug, minorVersion, State.STANDBY)
+        );
+        if (!latestStandby.isEmpty()) {
+          final List<AtomicUpdateGroup<T>> overshadowedByLatestStandby = latestStandby
+              .stream()
+              .flatMap(group -> findOvershadowedBy(group, State.VISIBLE).stream())
+              .collect(Collectors.toList());
+          replaceVisibleWith(overshadowedByLatestStandby, State.OVERSHADOWED, latestStandby, State.STANDBY);
+
+          // All standby groups overshadowed by the new visible group should be moved to overshadowed
+          latestStandby
+              .stream()
+              .flatMap(group -> findOvershadowedBy(group, State.STANDBY).stream())
+              .collect(Collectors.toSet())
+              .forEach(aug -> transitAtomicUpdateGroupState(aug, State.STANDBY, State.OVERSHADOWED));
+        } else if (augOfRemovedChunk.isEmpty()) {
+          // Visible is empty. Move the latest overshadowed to visible.
+          final List<AtomicUpdateGroup<T>> latestOvershadowed = findLatestNonFullyAvailableAtomicUpdateGroups(
+              findOvershadowedBy(rangeOfAug, minorVersion, State.OVERSHADOWED)
+          );
+          if (!latestOvershadowed.isEmpty()) {
+            latestOvershadowed.forEach(aug -> transitAtomicUpdateGroupState(aug, State.OVERSHADOWED, State.VISIBLE));
+          }
         }
-        latestFullAugs.forEach(group -> transitAtomicUpdateGroupState(group, State.OVERSHADOWED, State.VISIBLE));
       }
     }
   }
 
-  private List<AtomicUpdateGroup<T>> findLatestFullyAvailableOvershadowedAtomicUpdateGroup(
+  /**
+   * Find the latest NON-FULLY available atomicUpdateGroups from the given groups.
+   *
+   * This method MUST be called only when there is no fully available ones in the given groups. If the given groups
+   * are in the overshadowed state, calls {@link #findLatestFullyAvailableOvershadowedAtomicUpdateGroups} first
+   * to check there is any fully available group.
+   * If the given groups are in the standby state, you can freely call this method because there should be no fully
+   * available one in the standby groups at any time.
+   */
+  private List<AtomicUpdateGroup<T>> findLatestNonFullyAvailableAtomicUpdateGroups(List<AtomicUpdateGroup<T>> groups)
+  {
+    if (groups.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    final OvershadowableManager<T> manager = new OvershadowableManager<>(groups);
+    if (!manager.standbyGroups.isEmpty()) {
+      throw new ISE("This method should be called only when there is no fully available group in the given state.");
+    }
+
+    final List<AtomicUpdateGroup<T>> visibles = new ArrayList<>();
+    for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> map : manager.visibleGroup.values()) {
+      visibles.addAll(map.values());
+    }
+    return visibles;
+  }
+
+  private List<AtomicUpdateGroup<T>> findLatestFullyAvailableOvershadowedAtomicUpdateGroups(
       RootPartitionRange rangeOfAug,
       short minorVersion
   )
   {
-    final List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> overshadowedGroups = findOvershadowedBy(
+    final List<AtomicUpdateGroup<T>> overshadowedGroups = findOvershadowedBy(
         rangeOfAug,
         minorVersion,
         State.OVERSHADOWED
@@ -378,16 +805,22 @@ class OvershadowableManager<T extends Overshadowable<T>>
       return Collections.emptyList();
     }
 
-    final OvershadowableManager<T> manager = new OvershadowableManager<>();
-    for (Short2ObjectMap.Entry<AtomicUpdateGroup<T>> entry : overshadowedGroups) {
-      for (PartitionChunk<T> chunk : entry.getValue().getChunks()) {
-        manager.addChunk(chunk);
-      }
-    }
-
+    final OvershadowableManager<T> manager = new OvershadowableManager<>(overshadowedGroups);
     final List<AtomicUpdateGroup<T>> visibles = new ArrayList<>();
     for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> map : manager.visibleGroup.values()) {
-      visibles.addAll(map.values());
+      for (AtomicUpdateGroup<T> atomicUpdateGroup : map.values()) {
+        if (!atomicUpdateGroup.isFull()) {
+          return Collections.emptyList();
+        }
+        visibles.add(atomicUpdateGroup);
+      }
+    }
+    final RootPartitionRange foundRange = RootPartitionRange.of(
+        visibles.get(0).getStartRootPartitionId(),
+        visibles.get(visibles.size() - 1).getEndRootPartitionId()
+    );
+    if (!rangeOfAug.equals(foundRange)) {
+      return Collections.emptyList();
     }
     return visibles;
   }
@@ -419,7 +852,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
   }
 
   @Nullable
-  public PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk)
+  PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk)
   {
     final PartitionChunk<T> knownChunk = knownPartitionChunks.get(partitionChunk.getChunkNumber());
     if (knownChunk == null) {
@@ -461,7 +894,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
   }
 
   @Nullable
-  public PartitionChunk<T> getChunk(int partitionId)
+  PartitionChunk<T> getChunk(int partitionId)
   {
     final PartitionChunk<T> chunk = knownPartitionChunks.get(partitionId);
     if (chunk == null) {
@@ -480,26 +913,33 @@ class OvershadowableManager<T extends Overshadowable<T>>
     }
   }
 
-  public List<PartitionChunk<T>> getVisibles()
+  List<PartitionChunk<T>> getVisibleChunks()
   {
-    final List<PartitionChunk<T>> visibles = new ArrayList<>();
-    for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> treeMap : visibleGroup.values()) {
-      for (AtomicUpdateGroup<T> aug : treeMap.values()) {
-        visibles.addAll(aug.getChunks());
-      }
-    }
-    return visibles;
+    return getAllChunks(visibleGroup);
+  }
+
+  List<PartitionChunk<T>> getOvershadowedChunks()
+  {
+    return getAllChunks(overshadowedGroups);
+  }
+
+  @VisibleForTesting
+  List<PartitionChunk<T>> getStandbyChunks()
+  {
+    return getAllChunks(standbyGroups);
   }
 
-  public List<PartitionChunk<T>> getOvershadowed()
+  private List<PartitionChunk<T>> getAllChunks(
+      TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap
+  )
   {
-    final List<PartitionChunk<T>> overshadowed = new ArrayList<>();
-    for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> treeMap : overshadowedGroups.values()) {
+    final List<PartitionChunk<T>> allChunks = new ArrayList<>();
+    for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> treeMap : stateMap.values()) {
       for (AtomicUpdateGroup<T> aug : treeMap.values()) {
-        overshadowed.addAll(aug.getChunks());
+        allChunks.addAll(aug.getChunks());
       }
     }
-    return overshadowed;
+    return allChunks;
   }
 
   @Override
@@ -535,11 +975,18 @@ class OvershadowableManager<T extends Overshadowable<T>>
            '}';
   }
 
-  private static class RootPartitionRange implements Comparable<RootPartitionRange>
+  @VisibleForTesting
+  static class RootPartitionRange implements Comparable<RootPartitionRange>
   {
     private final short startPartitionId;
     private final short endPartitionId;
 
+    @VisibleForTesting
+    static RootPartitionRange of(int startPartitionId, int endPartitionId)
+    {
+      return new RootPartitionRange((short) startPartitionId, (short) endPartitionId);
+    }
+
     private static <T extends Overshadowable<T>> RootPartitionRange of(PartitionChunk<T> chunk)
     {
       return of(chunk.getObject().getStartRootPartitionId(), chunk.getObject().getEndRootPartitionId());
@@ -550,11 +997,6 @@ class OvershadowableManager<T extends Overshadowable<T>>
       return of(aug.getStartRootPartitionId(), aug.getEndRootPartitionId());
     }
 
-    private static RootPartitionRange of(int startPartitionId, int endPartitionId)
-    {
-      return new RootPartitionRange((short) startPartitionId, (short) endPartitionId);
-    }
-
     private RootPartitionRange(short startPartitionId, short endPartitionId)
     {
       this.startPartitionId = startPartitionId;
@@ -563,7 +1005,16 @@ class OvershadowableManager<T extends Overshadowable<T>>
 
     public boolean contains(RootPartitionRange that)
     {
-      return this.startPartitionId <= that.startPartitionId && this.endPartitionId >= that.endPartitionId;
+      return Short.toUnsignedInt(startPartitionId) <= Short.toUnsignedInt(that.startPartitionId)
+             && Short.toUnsignedInt(this.endPartitionId) >= Short.toUnsignedInt(that.endPartitionId);
+    }
+
+    public boolean overlaps(RootPartitionRange that)
+    {
+      return Short.toUnsignedInt(startPartitionId) <= Short.toUnsignedInt(that.startPartitionId)
+          && Short.toUnsignedInt(endPartitionId) > Short.toUnsignedInt(that.startPartitionId)
+          || Short.toUnsignedInt(startPartitionId) >= Short.toUnsignedInt(that.startPartitionId)
+          && Short.toUnsignedInt(startPartitionId) < Short.toUnsignedInt(that.endPartitionId);
     }
 
     @Override
@@ -638,7 +1089,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
       if (toKey > key) {
         return this;
       } else {
-        throw new IllegalArgumentException();
+        throw new IAE("toKey: %s, key: %s", toKey, key);
       }
     }
 
@@ -648,7 +1099,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
       if (fromKey <= key) {
         return this;
       } else {
-        throw new IllegalArgumentException();
+        throw new IAE("fromKey: %s, key: %s", fromKey, key);
       }
     }
 
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java b/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
index 26e34da..afa8460 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
@@ -113,13 +113,13 @@ public class PartitionHolder<T extends Overshadowable<T>> implements Iterable<Pa
   @Override
   public Iterator<PartitionChunk<T>> iterator()
   {
-    return overshadowableManager.getVisibles().iterator();
+    return overshadowableManager.getVisibleChunks().iterator();
   }
 
   @Override
   public Spliterator<PartitionChunk<T>> spliterator()
   {
-    return overshadowableManager.getVisibles().spliterator();
+    return overshadowableManager.getVisibleChunks().spliterator();
   }
 
   public Stream<PartitionChunk<T>> stream()
@@ -129,7 +129,7 @@ public class PartitionHolder<T extends Overshadowable<T>> implements Iterable<Pa
 
   public List<PartitionChunk<T>> getOvershadowed()
   {
-    return overshadowableManager.getOvershadowed();
+    return overshadowableManager.getOvershadowedChunks();
   }
 
   public Iterable<T> payloads()
diff --git a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
index 6971833..28b2dcca 100644
--- a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.timeline.partition.ImmutablePartitionHolder;
 import org.apache.druid.timeline.partition.IntegerPartitionChunk;
 import org.apache.druid.timeline.partition.NumberedOverwritingPartitionChunk;
 import org.apache.druid.timeline.partition.NumberedPartitionChunk;
+import org.apache.druid.timeline.partition.OvershadowableInteger;
 import org.apache.druid.timeline.partition.PartitionChunk;
 import org.apache.druid.timeline.partition.PartitionHolder;
 import org.apache.druid.timeline.partition.PartitionIds;
@@ -48,7 +49,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -2145,116 +2145,4 @@ public class VersionedIntervalTimelineTest
   {
     return new VersionedIntervalTimeline<>(Ordering.natural());
   }
-
-  private static class OvershadowableInteger implements Overshadowable<OvershadowableInteger>
-  {
-    private final String majorVersion;
-    private final int partitionNum;
-    private final int val;
-    private final int startRootPartitionId;
-    private final int endRootPartitionId;
-    private final short minorVersion;
-    private final short atomicUpdateGroupSize;
-
-    private OvershadowableInteger(String majorVersion, int partitionNum, int val)
-    {
-      this(majorVersion, partitionNum, val, partitionNum, partitionNum + 1, 0, 1);
-    }
-
-    private OvershadowableInteger(
-        String majorVersion,
-        int partitionNum,
-        int val,
-        int startRootPartitionId,
-        int endRootPartitionId,
-        int minorVersion,
-        int atomicUpdateGroupSize
-    )
-    {
-      this.majorVersion = majorVersion;
-      this.partitionNum = partitionNum;
-      this.val = val;
-      this.startRootPartitionId = startRootPartitionId;
-      this.endRootPartitionId = endRootPartitionId;
-      this.minorVersion = (short) minorVersion;
-      this.atomicUpdateGroupSize = (short) atomicUpdateGroupSize;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      OvershadowableInteger that = (OvershadowableInteger) o;
-      return partitionNum == that.partitionNum &&
-             val == that.val &&
-             startRootPartitionId == that.startRootPartitionId &&
-             endRootPartitionId == that.endRootPartitionId &&
-             minorVersion == that.minorVersion &&
-             atomicUpdateGroupSize == that.atomicUpdateGroupSize &&
-             Objects.equals(majorVersion, that.majorVersion);
-    }
-
-    @Override
-    public int hashCode()
-    {
-      return Objects.hash(
-          majorVersion,
-          partitionNum,
-          val,
-          startRootPartitionId,
-          endRootPartitionId,
-          minorVersion,
-          atomicUpdateGroupSize
-      );
-    }
-
-    @Override
-    public String toString()
-    {
-      return "OvershadowableInteger{" +
-             "majorVersion='" + majorVersion + '\'' +
-             ", partitionNum=" + partitionNum +
-             ", val=" + val +
-             ", startRootPartitionId=" + startRootPartitionId +
-             ", endRootPartitionId=" + endRootPartitionId +
-             ", minorVersion=" + minorVersion +
-             ", atomicUpdateGroupSize=" + atomicUpdateGroupSize +
-             '}';
-    }
-
-    @Override
-    public int getStartRootPartitionId()
-    {
-      return startRootPartitionId;
-    }
-
-    @Override
-    public int getEndRootPartitionId()
-    {
-      return endRootPartitionId;
-    }
-
-    @Override
-    public String getVersion()
-    {
-      return majorVersion;
-    }
-
-    @Override
-    public short getMinorVersion()
-    {
-      return minorVersion;
-    }
-
-    @Override
-    public short getAtomicUpdateGroupSize()
-    {
-      return atomicUpdateGroupSize;
-    }
-  }
 }
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/IntegerPartitionChunkTest.java b/core/src/test/java/org/apache/druid/timeline/partition/IntegerPartitionChunkTest.java
index 04e4ccf..b783439 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/IntegerPartitionChunkTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/IntegerPartitionChunkTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.timeline.partition;
 
-import org.apache.druid.timeline.Overshadowable;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -110,44 +109,4 @@ public class IntegerPartitionChunkTest
     Assert.assertEquals(make(10, null, 0, 1), make(10, null, 0, 1));
     Assert.assertEquals(make(10, 11, 0, 1), make(10, 11, 0, 1));
   }
-
-  private static class OvershadowableInteger implements Overshadowable<OvershadowableInteger>
-  {
-    private final int val;
-
-    OvershadowableInteger(int val)
-    {
-      this.val = val;
-    }
-
-    @Override
-    public int getStartRootPartitionId()
-    {
-      return 0;
-    }
-
-    @Override
-    public int getEndRootPartitionId()
-    {
-      return 1;
-    }
-
-    @Override
-    public String getVersion()
-    {
-      return "";
-    }
-
-    @Override
-    public short getMinorVersion()
-    {
-      return 0;
-    }
-
-    @Override
-    public short getAtomicUpdateGroupSize()
-    {
-      return 1;
-    }
-  }
 }
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableInteger.java b/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableInteger.java
new file mode 100644
index 0000000..1eca972
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableInteger.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import org.apache.druid.timeline.Overshadowable;
+
+import java.util.Objects;
+
+public class OvershadowableInteger implements Overshadowable<OvershadowableInteger>
+{
+  private final String majorVersion;
+  private final int partitionNum;
+  private final int val;
+  private final int startRootPartitionId;
+  private final int endRootPartitionId;
+  private final short minorVersion;
+  private final short atomicUpdateGroupSize;
+
+  public OvershadowableInteger(int val)
+  {
+    this("", 0, val);
+  }
+
+  public OvershadowableInteger(String majorVersion, int partitionNum, int val)
+  {
+    this(majorVersion, partitionNum, val, partitionNum, partitionNum + 1, 0, 1);
+  }
+
+  public OvershadowableInteger(
+      String majorVersion,
+      int partitionNum,
+      int val,
+      int startRootPartitionId,
+      int endRootPartitionId,
+      int minorVersion,
+      int atomicUpdateGroupSize
+  )
+  {
+    this.majorVersion = majorVersion;
+    this.partitionNum = partitionNum;
+    this.val = val;
+    this.startRootPartitionId = startRootPartitionId;
+    this.endRootPartitionId = endRootPartitionId;
+    this.minorVersion = (short) minorVersion;
+    this.atomicUpdateGroupSize = (short) atomicUpdateGroupSize;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    OvershadowableInteger that = (OvershadowableInteger) o;
+    return partitionNum == that.partitionNum &&
+           val == that.val &&
+           startRootPartitionId == that.startRootPartitionId &&
+           endRootPartitionId == that.endRootPartitionId &&
+           minorVersion == that.minorVersion &&
+           atomicUpdateGroupSize == that.atomicUpdateGroupSize &&
+           Objects.equals(majorVersion, that.majorVersion);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(
+        majorVersion,
+        partitionNum,
+        val,
+        startRootPartitionId,
+        endRootPartitionId,
+        minorVersion,
+        atomicUpdateGroupSize
+    );
+  }
+
+  @Override
+  public String toString()
+  {
+    return "OvershadowableInteger{" +
+           "majorVersion='" + majorVersion + '\'' +
+           ", partitionNum=" + partitionNum +
+           ", val=" + val +
+           ", startRootPartitionId=" + startRootPartitionId +
+           ", endRootPartitionId=" + endRootPartitionId +
+           ", minorVersion=" + minorVersion +
+           ", atomicUpdateGroupSize=" + atomicUpdateGroupSize +
+           '}';
+  }
+
+  @Override
+  public int getStartRootPartitionId()
+  {
+    return startRootPartitionId;
+  }
+
+  @Override
+  public int getEndRootPartitionId()
+  {
+    return endRootPartitionId;
+  }
+
+  @Override
+  public String getVersion()
+  {
+    return majorVersion;
+  }
+
+  @Override
+  public short getMinorVersion()
+  {
+    return minorVersion;
+  }
+
+  @Override
+  public short getAtomicUpdateGroupSize()
+  {
+    return atomicUpdateGroupSize;
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java b/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java
new file mode 100644
index 0000000..71566d0
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.timeline.partition.OvershadowableManager.RootPartitionRange;
+import org.apache.druid.timeline.partition.OvershadowableManager.State;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class OvershadowableManagerTest
+{
+  private static final String MAJOR_VERSION = "version";
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private OvershadowableManager<OvershadowableInteger> manager;
+  private int nextRootPartitionId;
+  private int nextNonRootPartitionId;
+  private List<PartitionChunk<OvershadowableInteger>> expectedVisibleChunks;
+  private List<PartitionChunk<OvershadowableInteger>> expectedOvershadowedChunks;
+  private List<PartitionChunk<OvershadowableInteger>> expectedStandbyChunks;
+
+  @Before
+  public void setup()
+  {
+    manager = new OvershadowableManager<>();
+    nextRootPartitionId = PartitionIds.ROOT_GEN_START_PARTITION_ID;
+    nextNonRootPartitionId = PartitionIds.NON_ROOT_GEN_START_PARTITION_ID;
+    expectedVisibleChunks = new ArrayList<>();
+    expectedOvershadowedChunks = new ArrayList<>();
+    expectedStandbyChunks = new ArrayList<>();
+  }
+
+  @Test
+  public void testFindOvershadowedBy()
+  {
+    final List<PartitionChunk<OvershadowableInteger>> expectedOvershadowedChunks = new ArrayList<>();
+
+    // All chunks except the last one are in the overshadowed state
+    PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 2, 1, 1);
+    manager.addChunk(chunk);
+    chunk = newNonRootChunk(0, 3, 2, 1);
+    manager.addChunk(chunk);
+    chunk = newNonRootChunk(0, 5, 3, 1);
+    manager.addChunk(chunk);
+    chunk = newNonRootChunk(5, 8, 1, 1);
+    expectedOvershadowedChunks.add(chunk);
+    manager.addChunk(chunk);
+    chunk = newNonRootChunk(8, 11, 2, 1);
+    manager.addChunk(chunk);
+    chunk = newNonRootChunk(5, 11, 3, 1);
+    manager.addChunk(chunk);
+    chunk = newNonRootChunk(0, 12, 5, 1);
+    manager.addChunk(chunk);
+
+    List<AtomicUpdateGroup<OvershadowableInteger>> overshadowedGroups = manager.findOvershadowedBy(
+        RootPartitionRange.of(2, 10),
+        (short) 10,
+        State.OVERSHADOWED
+    );
+    Assert.assertEquals(
+        expectedOvershadowedChunks.stream().map(AtomicUpdateGroup::new).collect(Collectors.toList()),
+        overshadowedGroups
+    );
+
+    overshadowedGroups = manager.findOvershadowedBy(
+        RootPartitionRange.of(2, 10),
+        (short) 10,
+        State.VISIBLE
+    );
+    Assert.assertEquals(
+        Collections.emptyList(),
+        overshadowedGroups
+    );
+  }
+
+  @Test
+  public void testFindOvershadows()
+  {
+    PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(2, 6, 3, 1);
+    manager.addChunk(chunk);
+    chunk = newNonRootChunk(6, 8, 3, 1);
+    manager.addChunk(chunk);
+    chunk = newNonRootChunk(1, 8, 4, 1);
+    final PartitionChunk<OvershadowableInteger> visibleChunk = chunk;
+    manager.addChunk(chunk);
+
+    List<AtomicUpdateGroup<OvershadowableInteger>> overshadowingGroups = manager.findOvershadows(
+        RootPartitionRange.of(1, 3),
+        (short) 1,
+        State.OVERSHADOWED
+    );
+    Assert.assertEquals(
+        Collections.emptyList(),
+        overshadowingGroups
+    );
+    overshadowingGroups = manager.findOvershadows(
+        RootPartitionRange.of(1, 3),
+        (short) 1,
+        State.VISIBLE
+    );
+    Assert.assertEquals(
+        ImmutableList.of(new AtomicUpdateGroup<>(visibleChunk)),
+        overshadowingGroups
+    );
+
+    overshadowingGroups = manager.findOvershadows(
+        RootPartitionRange.of(4, 7),
+        (short) 1,
+        State.OVERSHADOWED
+    );
+    Assert.assertEquals(
+        Collections.emptyList(),
+        overshadowingGroups
+    );
+    overshadowingGroups = manager.findOvershadows(
+        RootPartitionRange.of(4, 7),
+        (short) 1,
+        State.VISIBLE
+    );
+    Assert.assertEquals(
+        ImmutableList.of(new AtomicUpdateGroup<>(visibleChunk)),
+        overshadowingGroups
+    );
+  }
+
+  @Test
+  public void testAddRootChunkToEmptyManager()
+  {
+    Assert.assertTrue(manager.isEmpty());
+    // Add a new one
+    PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+    Assert.assertTrue(manager.isComplete());
+    // Add a duplicate
+    Assert.assertFalse(manager.addChunk(chunk));
+    // Add a new one
+    chunk = newRootChunk();
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+    Assert.assertTrue(manager.isComplete());
+  }
+
+  @Test
+  public void testAddNonRootChunkToEmptyManager()
+  {
+    Assert.assertTrue(manager.isEmpty());
+    // Add a new one, atomicUpdateGroup is not full
+    PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(10, 12, 1, 3);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+    Assert.assertFalse(manager.isComplete());
+    // Add a new one, atomicUpdateGroup is still not full
+    chunk = newNonRootChunk(10, 12, 1, 3);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+    Assert.assertFalse(manager.isComplete());
+    // Add a new one, now atomicUpdateGroup is full
+    chunk = newNonRootChunk(10, 12, 1, 3);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+    Assert.assertTrue(manager.isComplete());
+
+    // Add a new one to the full group
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("Can't add chunk");
+    chunk = newNonRootChunk(10, 12, 1, 3);
+    addVisibleToManager(chunk);
+  }
+
+  @Test
+  public void testRemoveFromEmptyManager()
+  {
+    Assert.assertTrue(manager.isEmpty());
+    PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+    Assert.assertNull(manager.removeChunk(chunk));
+  }
+
+  @Test
+  public void testAddOvershadowedChunkToCompletePartition()
+  {
+    // Start with a non-root incomplete partitionChunk
+    PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 3, 1, 2);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add a visible root chunk, now this group is complete
+    chunk = newNonRootChunk(0, 3, 1, 2);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add an overshadowed chunk
+    nextRootPartitionId = 1;
+    chunk = newRootChunk();
+    Assert.assertTrue(manager.addChunk(chunk));
+    expectedOvershadowedChunks.add(chunk);
+    assertManagerState();
+  }
+
+  @Test
+  public void testAddOvershadowedChunkToIncompletePartition()
+  {
+    // Start with a non-root partitionChunk. This group is incomplete.
+    PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 3, 1, 2);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add an overshadowed chunk
+    nextRootPartitionId = 1;
+    chunk = newRootChunk();
+    expectedOvershadowedChunks.add(chunk);
+    Assert.assertTrue(manager.addChunk(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testAddStandbyChunksToCompletePartition()
+  {
+    // Add complete chunks
+    PartitionChunk<OvershadowableInteger> chunk;
+    for (int i = 0; i < 3; i++) {
+      chunk = newRootChunk();
+      Assert.assertTrue(addVisibleToManager(chunk));
+      assertManagerState();
+    }
+
+    // Add a chunk of an incomplete group
+    chunk = newNonRootChunk(0, 3, 1, 2);
+    expectedStandbyChunks.add(chunk);
+    Assert.assertTrue(manager.addChunk(chunk));
+    assertManagerState();
+
+    // This group is now full
+    chunk = newNonRootChunk(0, 3, 1, 2);
+    expectedOvershadowedChunks.addAll(expectedVisibleChunks);
+    expectedVisibleChunks.clear();
+    expectedVisibleChunks.addAll(expectedStandbyChunks);
+    expectedStandbyChunks.clear();
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testAddStandbyChunksToIncompletePartition()
+  {
+    // Add a chunk of an incomplete group
+    PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 3, 1, 2);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add chunks of an incomplete group overshadowing the previous one
+    chunk = newNonRootChunk(0, 3, 2, 3);
+    expectedOvershadowedChunks.add(expectedVisibleChunks.remove(0));
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    chunk = newNonRootChunk(0, 3, 2, 3);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testRemoveUnknownChunk()
+  {
+    PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    chunk = newRootChunk();
+    Assert.assertNull(manager.removeChunk(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testRemoveChunksUntilEmpty()
+  {
+    PartitionChunk<OvershadowableInteger> chunk;
+    for (int i = 0; i < 10; i++) {
+      chunk = newRootChunk();
+      Assert.assertTrue(addVisibleToManager(chunk));
+      assertManagerState();
+    }
+
+    while (expectedVisibleChunks.size() > 0) {
+      chunk = expectedVisibleChunks.remove(ThreadLocalRandom.current().nextInt(expectedVisibleChunks.size()));
+      Assert.assertEquals(chunk, manager.removeChunk(chunk));
+      assertManagerState();
+    }
+
+    Assert.assertTrue(manager.isEmpty());
+  }
+
+  @Test
+  public void testRemoveStandbyChunk()
+  {
+    // Add complete groups
+    PartitionChunk<OvershadowableInteger> chunk;
+    for (int i = 0; i < 3; i++) {
+      chunk = newRootChunk();
+      Assert.assertTrue(addVisibleToManager(chunk));
+      assertManagerState();
+    }
+
+    // Add two chunks of an incomplete group overshadowing the previous one
+    chunk = newNonRootChunk(0, 3, 1, 3);
+    expectedStandbyChunks.add(chunk);
+    Assert.assertTrue(manager.addChunk(chunk));
+    assertManagerState();
+    chunk = newNonRootChunk(0, 3, 1, 3);
+    expectedStandbyChunks.add(chunk);
+    Assert.assertTrue(manager.addChunk(chunk));
+    assertManagerState();
+
+    // Remove one standby chunk
+    chunk = expectedStandbyChunks.remove(0);
+    Assert.assertEquals(chunk, manager.removeChunk(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testRemoveVisibleChunkAndFallBackToStandby()
+  {
+    // Add two complete groups
+    PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+    chunk = newRootChunk();
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add two chunks of an incomplete group
+    chunk = newNonRootChunk(0, 2, 1, 3);
+    expectedStandbyChunks.add(chunk);
+    Assert.assertTrue(manager.addChunk(chunk));
+    assertManagerState();
+    chunk = newNonRootChunk(0, 2, 1, 3);
+    expectedStandbyChunks.add(chunk);
+    Assert.assertTrue(manager.addChunk(chunk));
+    assertManagerState();
+
+    // Remove a chunk of the incomplete group
+    chunk = expectedVisibleChunks.remove(0);
+    Assert.assertEquals(chunk, manager.removeChunk(chunk));
+    expectedOvershadowedChunks.addAll(expectedVisibleChunks);
+    expectedVisibleChunks.clear();
+    expectedVisibleChunks.addAll(expectedStandbyChunks);
+    expectedStandbyChunks.clear();
+    assertManagerState();
+  }
+
+  @Test
+  public void testAddCompleteOvershadowedToInCompletePartition()
+  {
+    // Add an incomplete group
+    PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 2, 1, 3);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+    chunk = newNonRootChunk(0, 2, 1, 3);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add a complete overshadowed group
+    chunk = newRootChunk();
+    expectedOvershadowedChunks.add(chunk);
+    Assert.assertTrue(manager.addChunk(chunk));
+    assertManagerState();
+    chunk = newRootChunk();
+    expectedStandbyChunks.addAll(expectedVisibleChunks);
+    expectedVisibleChunks.clear();
+    expectedVisibleChunks.add(expectedOvershadowedChunks.remove(0));
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testAddCompleteOvershadowedToCompletePartition()
+  {
+    // Add a complete group
+    PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 2, 1, 2);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+    chunk = newNonRootChunk(0, 2, 1, 2);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add a complete overshadowed group
+    chunk = newRootChunk();
+    expectedOvershadowedChunks.add(chunk);
+    Assert.assertTrue(manager.addChunk(chunk));
+    assertManagerState();
+    chunk = newRootChunk();
+    expectedOvershadowedChunks.add(chunk);
+    Assert.assertTrue(manager.addChunk(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testRemoveChunkFromOvershadowd()
+  {
+    // Add a complete group
+    nextRootPartitionId = 1;
+    PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add an incomplete group of a larger partition range
+    chunk = newNonRootChunk(0, 2, 1, 2);
+    expectedOvershadowedChunks.add(expectedVisibleChunks.remove(0));
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Remove an overshadowed chunk
+    chunk = expectedOvershadowedChunks.remove(0);
+    Assert.assertEquals(chunk, manager.removeChunk(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testRemoveChunkFromCompleteParition()
+  {
+    // Add a complete group
+    nextRootPartitionId = 1;
+    PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add a complete group overshadowing the previous
+    chunk = newNonRootChunk(0, 2, 1, 2);
+    expectedOvershadowedChunks.add(expectedVisibleChunks.remove(0));
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+    chunk = newNonRootChunk(0, 2, 1, 2);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Remove a chunk from the visible group
+    chunk = expectedVisibleChunks.remove(0);
+    Assert.assertEquals(chunk, manager.removeChunk(chunk));
+    assertManagerState();
+
+    // Remove another chunk from the visible group. Now the overshadowed group should be visible.
+    chunk = expectedVisibleChunks.remove(0);
+    expectedVisibleChunks.addAll(expectedOvershadowedChunks);
+    expectedOvershadowedChunks.clear();
+    Assert.assertEquals(chunk, manager.removeChunk(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testRemoveChunkFromCompletePartitionFallBackToOvershadowed()
+  {
+    // Add complete groups
+    PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+    chunk = newRootChunk();
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add a complete group overshadowing the previous
+    chunk = newNonRootChunk(0, 2, 1, 2);
+    expectedStandbyChunks.add(chunk);
+    Assert.assertTrue(manager.addChunk(chunk));
+    assertManagerState();
+
+    chunk = newNonRootChunk(0, 2, 1, 2);
+    expectedOvershadowedChunks.addAll(expectedVisibleChunks);
+    expectedVisibleChunks.clear();
+    expectedVisibleChunks.add(expectedStandbyChunks.remove(0));
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Remove a visible chunk. Should fall back to the complete overshadowed group.
+    chunk = expectedVisibleChunks.remove(0);
+    expectedStandbyChunks.addAll(expectedVisibleChunks);
+    expectedVisibleChunks.clear();
+    expectedVisibleChunks.addAll(expectedOvershadowedChunks);
+    expectedOvershadowedChunks.clear();
+    Assert.assertEquals(chunk, manager.removeChunk(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testAddCompleteOvershadowedToCompletePartition2()
+  {
+    // Add overshadowed incomplete groups
+    List<PartitionChunk<OvershadowableInteger>> chunks = newNonRootChunks(2, 0, 2, 1, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+    chunks = newNonRootChunks(2, 2, 5, 1, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+    chunks = newNonRootChunks(2, 5, 8, 1, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+    chunks = newNonRootChunks(2, 8, 10, 1, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    chunks = newNonRootChunks(2, 0, 5, 2, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    chunks = newNonRootChunks(2, 0, 8, 3, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    // Add a visible complete group
+    chunks = newNonRootChunks(2, 0, 10, 4, 2);
+    expectedVisibleChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    // Add a standby incomplete group
+    chunks = newNonRootChunks(1, 0, 10, 5, 2);
+    expectedStandbyChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    assertManagerState();
+
+    // Add a chunk to complete the second overshadowed group
+    chunks = newNonRootChunks(1, 0, 5, 2, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+    chunks = newNonRootChunks(1, 5, 8, 1, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+    chunks = newNonRootChunks(1, 8, 10, 1, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    assertManagerState();
+
+    // Remove a chunk from the visible group
+    PartitionChunk<OvershadowableInteger> chunkToRemove = expectedVisibleChunks.remove(0);
+    expectedStandbyChunks.addAll(expectedVisibleChunks);
+    expectedVisibleChunks.clear();
+    Iterator<PartitionChunk<OvershadowableInteger>> iterator = expectedOvershadowedChunks.iterator();
+    while (iterator.hasNext()) {
+      final PartitionChunk<OvershadowableInteger> chunk = iterator.next();
+      if (chunk.getObject().getStartRootPartitionId() == 0 && chunk.getObject().getMinorVersion() == 2
+          || chunk.getObject().getStartRootPartitionId() == 5 && chunk.getObject().getMinorVersion() == 1
+          || chunk.getObject().getStartRootPartitionId() == 8 && chunk.getObject().getMinorVersion() == 1) {
+        expectedVisibleChunks.add(chunk);
+        iterator.remove();
+      } else if (chunk.getObject().getStartRootPartitionId() == 0 && chunk.getObject().getMinorVersion() > 2
+                 || chunk.getObject().getStartRootPartitionId() == 5 && chunk.getObject().getMinorVersion() > 1
+                 || chunk.getObject().getStartRootPartitionId() == 8 && chunk.getObject().getMinorVersion() > 1) {
+        expectedStandbyChunks.add(chunk);
+        iterator.remove();
+      }
+    }
+    Assert.assertEquals(chunkToRemove, manager.removeChunk(chunkToRemove));
+    assertManagerState();
+  }
+
+  @Test
+  public void testAddCompleteStandbyToCompletePartition()
+  {
+    // Add overshadowed groups
+    List<PartitionChunk<OvershadowableInteger>> chunks = newNonRootChunks(2, 0, 2, 1, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+    chunks = newNonRootChunks(2, 2, 5, 1, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+    chunks = newNonRootChunks(2, 5, 8, 1, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+    chunks = newNonRootChunks(2, 8, 10, 1, 2);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    // Visible group for [0, 5)
+    chunks = newNonRootChunks(2, 0, 5, 2, 2);
+    expectedVisibleChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+    // Visible group for [5, 10)
+    chunks = newNonRootChunks(2, 5, 10, 2, 2);
+    expectedVisibleChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    // Standby groups
+    chunks = newNonRootChunks(2, 0, 5, 3, 3);
+    expectedStandbyChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+    chunks = newNonRootChunks(2, 5, 10, 3, 3);
+    expectedStandbyChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    chunks = newNonRootChunks(2, 0, 5, 4, 3);
+    expectedStandbyChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    chunks = newNonRootChunks(2, 0, 10, 5, 3);
+    expectedStandbyChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    assertManagerState();
+
+    // Add a chunk to complete the second standby group
+    expectedOvershadowedChunks.addAll(expectedVisibleChunks);
+    expectedVisibleChunks.clear();
+    chunks = newNonRootChunks(1, 0, 5, 4, 3);
+    chunks.forEach(this::addVisibleToManager);
+    chunks = newNonRootChunks(1, 5, 10, 3, 3);
+    chunks.forEach(this::addVisibleToManager);
+
+    Iterator<PartitionChunk<OvershadowableInteger>> iterator = expectedStandbyChunks.iterator();
+    while (iterator.hasNext()) {
+      final PartitionChunk<OvershadowableInteger> chunk = iterator.next();
+      if (chunk.getObject().getStartRootPartitionId() == 0 && chunk.getObject().getMinorVersion() == 4
+          || chunk.getObject().getStartRootPartitionId() == 5 && chunk.getObject().getMinorVersion() == 3) {
+        expectedVisibleChunks.add(chunk);
+        iterator.remove();
+      } else if (chunk.getObject().getStartRootPartitionId() == 0 && chunk.getObject().getMinorVersion() < 4
+                 || chunk.getObject().getStartRootPartitionId() == 5 && chunk.getObject().getMinorVersion() < 3) {
+        expectedOvershadowedChunks.add(chunk);
+        iterator.remove();
+      }
+    }
+
+    assertManagerState();
+  }
+
+  @Test
+  public void testFallBackToStandby2()
+  {
+    // Add an overshadowed incomplete group
+    List<PartitionChunk<OvershadowableInteger>> chunks = newNonRootChunks(2, 0, 2, 1, 3);
+    expectedOvershadowedChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    // Add a visible complete group
+    chunks = newNonRootChunks(2, 0, 2, 2, 2);
+    expectedVisibleChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    // Add three standby incomplete groups
+    chunks = newNonRootChunks(2, 0, 2, 3, 3);
+    expectedStandbyChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    chunks = newNonRootChunks(2, 0, 2, 4, 3);
+    expectedStandbyChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    chunks = newNonRootChunks(2, 0, 2, 5, 3);
+    expectedStandbyChunks.addAll(chunks);
+    chunks.forEach(manager::addChunk);
+
+    assertManagerState();
+
+    // Remove a visible chunk. The latest standby group should be visible.
+    PartitionChunk<OvershadowableInteger> chunkToRemove = expectedVisibleChunks.remove(0);
+    expectedOvershadowedChunks.addAll(expectedVisibleChunks);
+    expectedVisibleChunks.clear();
+    Iterator<PartitionChunk<OvershadowableInteger>> iterator = expectedStandbyChunks.iterator();
+    while (iterator.hasNext()) {
+      final PartitionChunk<OvershadowableInteger> chunk = iterator.next();
+      if (chunk.getObject().getMinorVersion() == 5) {
+        expectedVisibleChunks.add(chunk);
+        iterator.remove();
+      } else {
+        expectedOvershadowedChunks.add(chunk);
+        iterator.remove();
+      }
+    }
+
+    Assert.assertEquals(chunkToRemove, manager.removeChunk(chunkToRemove));
+    assertManagerState();
+  }
+
+  @Test
+  public void testAddAndOverwriteAndAdd()
+  {
+    // Start with root partitionChunks
+    for (int i = 0; i < 5; i++) {
+      PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+      Assert.assertTrue(addVisibleToManager(chunk));
+    }
+    assertManagerState();
+
+    // Overwrite some partitionChunks with a higher minor version
+    final int rootStartPartitionIdToOverwrite = expectedVisibleChunks.get(1).getChunkNumber();
+    final int rootEndPartitionIdToOverwrite = expectedVisibleChunks.get(3).getChunkNumber();
+    for (int i = 0; i < 2; i++) {
+      PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(
+          rootStartPartitionIdToOverwrite,
+          rootEndPartitionIdToOverwrite,
+          3,
+          2
+      );
+      Assert.assertTrue(manager.addChunk(chunk));
+      if (i == 0) {
+        expectedStandbyChunks.add(chunk);
+      }
+      if (i == 1) {
+        expectedOvershadowedChunks.addAll(expectedVisibleChunks.subList(1, 3));
+        expectedVisibleChunks.subList(1, 3).clear();
+        expectedVisibleChunks.addAll(expectedStandbyChunks);
+        expectedVisibleChunks.add(chunk);
+        expectedStandbyChunks.clear();
+      }
+      assertManagerState();
+    }
+
+    // Append new visible chunks
+    for (int i = 0; i < 3; i++) {
+      PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+      Assert.assertTrue(addVisibleToManager(chunk));
+    }
+    assertManagerState();
+
+    // Append complete overshadowed chunks
+    for (int i = 0; i < 2; i++) {
+      PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(
+          rootStartPartitionIdToOverwrite,
+          rootEndPartitionIdToOverwrite,
+          2,
+          2
+      );
+      expectedOvershadowedChunks.add(chunk);
+      Assert.assertTrue(manager.addChunk(chunk));
+      assertManagerState();
+    }
+  }
+
+  @Test
+  public void testRemoveOvershadowed()
+  {
+    // Start with root partitionChunks
+    for (int i = 0; i < 5; i++) {
+      PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+      Assert.assertTrue(addVisibleToManager(chunk));
+    }
+
+    // Overwrite some partitionChunks with a higher minor version
+    final int rootStartPartitionIdToOverwrite = expectedVisibleChunks.get(1).getChunkNumber();
+    final int rootEndPartitionIdToOverwrite = expectedVisibleChunks.get(3).getChunkNumber();
+    for (int i = 0; i < 2; i++) {
+      PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(
+          rootStartPartitionIdToOverwrite,
+          rootEndPartitionIdToOverwrite,
+          1,
+          2
+      );
+      Assert.assertTrue(addVisibleToManager(chunk));
+    }
+    expectedOvershadowedChunks.addAll(expectedVisibleChunks.subList(1, 3));
+    IntStream.range(0, 2).forEach(i -> expectedVisibleChunks.remove(1));
+    assertManagerState();
+
+    // Remove an overshadowed chunk
+    PartitionChunk<OvershadowableInteger> chunk = expectedOvershadowedChunks.remove(0);
+    Assert.assertEquals(chunk, manager.removeChunk(chunk));
+    assertManagerState();
+
+    // Remove a chunk overshadows others
+    for (PartitionChunk<OvershadowableInteger> visibleChunk : expectedVisibleChunks) {
+      if (visibleChunk.getChunkNumber() >= PartitionIds.NON_ROOT_GEN_START_PARTITION_ID) {
+        Assert.assertEquals(visibleChunk, removeVisibleFromManager(visibleChunk));
+        break;
+      }
+    }
+    assertManagerState();
+  }
+
+  @Test
+  public void testRemoveOvershadowingVisible()
+  {
+    // Start with root partitionChunks
+    for (int i = 0; i < 5; i++) {
+      PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+      Assert.assertTrue(addVisibleToManager(chunk));
+    }
+
+    // Overwrite some partitionChunks with a higher minor version
+    final int rootStartPartitionIdToOverwrite = expectedVisibleChunks.get(1).getChunkNumber();
+    final int rootEndPartitionIdToOverwrite = expectedVisibleChunks.get(3).getChunkNumber();
+    for (int i = 0; i < 2; i++) {
+      PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(
+          rootStartPartitionIdToOverwrite,
+          rootEndPartitionIdToOverwrite,
+          1,
+          2
+      );
+      Assert.assertTrue(addVisibleToManager(chunk));
+    }
+    expectedOvershadowedChunks.addAll(expectedVisibleChunks.subList(1, 3));
+    IntStream.range(0, 2).forEach(i -> expectedVisibleChunks.remove(1));
+    assertManagerState();
+
+    // Remove a chunk overshadows others
+    boolean removed = false;
+    final Iterator<PartitionChunk<OvershadowableInteger>> iterator = expectedVisibleChunks.iterator();
+    while (iterator.hasNext()) {
+      final PartitionChunk<OvershadowableInteger> visibleChunk = iterator.next();
+      if (visibleChunk.getChunkNumber() >= PartitionIds.NON_ROOT_GEN_START_PARTITION_ID) {
+        iterator.remove();
+        if (!removed) {
+          manager.removeChunk(visibleChunk);
+          removed = true;
+        } else {
+          expectedStandbyChunks.add(visibleChunk);
+        }
+      }
+    }
+    expectedVisibleChunks.addAll(expectedOvershadowedChunks);
+    expectedOvershadowedChunks.clear();
+    assertManagerState();
+  }
+
+  @Test
+  public void testFallBackToStandbyOnRemove()
+  {
+    PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add a chunk of an incomplete atomicUpdateGroup
+    chunk = newNonRootChunk(0, 1, 1, 3);
+    expectedStandbyChunks.add(chunk);
+    Assert.assertTrue(manager.addChunk(chunk));
+    assertManagerState();
+
+    // Add a chunk of an incomplete atomicUpdateGroup which overshadows the previous one
+    chunk = newNonRootChunk(0, 1, 2, 2);
+    expectedStandbyChunks.add(chunk);
+    Assert.assertTrue(manager.addChunk(chunk));
+    assertManagerState();
+
+    // Remove the visible chunk
+    chunk = expectedVisibleChunks.remove(0);
+    expectedVisibleChunks.add(expectedStandbyChunks.remove(1));
+    expectedOvershadowedChunks.add(expectedStandbyChunks.remove(0));
+    Assert.assertEquals(chunk, manager.removeChunk(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testFallBackToOvershadowedOnRemove()
+  {
+    PartitionChunk<OvershadowableInteger> chunk;
+    // Add incomplete non-root group
+    for (int i = 0; i < 2; i++) {
+      chunk = newNonRootChunk(10, 20, 5, 3);
+      Assert.assertTrue(addVisibleToManager(chunk));
+    }
+    assertManagerState();
+
+    // Add incomplete non-root group overshadowed by the previous one
+    for (int i = 0; i < 2; i++) {
+      chunk = newNonRootChunk(10, 20, 4, 3);
+      expectedOvershadowedChunks.add(chunk);
+      Assert.assertTrue(manager.addChunk(chunk));
+      chunk = newNonRootChunk(10, 20, 3, 3);
+      expectedOvershadowedChunks.add(chunk);
+      Assert.assertTrue(manager.addChunk(chunk));
+    }
+    assertManagerState();
+
+    // Remove the visible group one by one
+    chunk = expectedVisibleChunks.remove(0);
+    Assert.assertEquals(chunk, manager.removeChunk(chunk));
+    assertManagerState();
+
+    chunk = expectedVisibleChunks.remove(0);
+    expectedOvershadowedChunks
+        .stream()
+        .filter(c -> c.getObject().getMinorVersion() == 4)
+        .forEach(c -> expectedVisibleChunks.add(c));
+    expectedOvershadowedChunks.removeAll(expectedVisibleChunks);
+    Assert.assertEquals(chunk, manager.removeChunk(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testAddIncompleteAtomicUpdateGroups()
+  {
+    // Add an incomplete chunk
+    PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 1, 1, 3);
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add an incomplete chunk overshadowing the previous one. The atomicUpdateGroup of this chunk
+    // will be complete later in this test.
+    chunk = newNonRootChunk(0, 1, 2, 2);
+    expectedOvershadowedChunks.add(expectedVisibleChunks.remove(0));
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add an incomplete chunk overshadowing the previous one
+    chunk = newNonRootChunk(0, 1, 3, 5);
+    expectedOvershadowedChunks.add(expectedVisibleChunks.remove(0));
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add a chunk to complete the second atomicUpdateGroup overshadowed by the previous one
+    chunk = newNonRootChunk(0, 1, 2, 2);
+    expectedStandbyChunks.add(expectedVisibleChunks.remove(0));
+    expectedVisibleChunks.add(expectedOvershadowedChunks.remove(expectedOvershadowedChunks.size() - 1));
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+  }
+
+  @Test
+  public void testMissingStartRootPartitionId()
+  {
+    // Simulate the first two chunks are missing at the root level
+    nextRootPartitionId = 2;
+    PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+    Assert.assertTrue(addVisibleToManager(chunk));
+    assertManagerState();
+
+    // Add a new group overshadows the previous one
+    expectedOvershadowedChunks.addAll(expectedVisibleChunks);
+    expectedVisibleChunks.clear();
+    for (int i = 0; i < 2; i++) {
+      chunk = newNonRootChunk(0, 3, 1, 2);
+      Assert.assertTrue(addVisibleToManager(chunk));
+    }
+    assertManagerState();
+
+    // Remove the visible group
+    for (int i = 0; i < 2; i++) {
+      chunk = expectedVisibleChunks.remove(0);
+      Assert.assertEquals(chunk, manager.removeChunk(chunk));
+    }
+    expectedVisibleChunks.addAll(expectedOvershadowedChunks);
+    expectedOvershadowedChunks.clear();
+    assertManagerState();
+  }
+
+  private boolean addVisibleToManager(PartitionChunk<OvershadowableInteger> chunk)
+  {
+    expectedVisibleChunks.add(chunk);
+    return manager.addChunk(chunk);
+  }
+
+  private PartitionChunk<OvershadowableInteger> removeVisibleFromManager(PartitionChunk<OvershadowableInteger> chunk)
+  {
+    expectedVisibleChunks.remove(chunk);
+    return manager.removeChunk(chunk);
+  }
+
+  private void assertManagerState()
+  {
+    Assert.assertEquals(
+        "Mismatched visible chunks",
+        new HashSet<>(expectedVisibleChunks),
+        new HashSet<>(manager.getVisibleChunks())
+    );
+    Assert.assertEquals(
+        "Mismatched overshadowed chunks",
+        new HashSet<>(expectedOvershadowedChunks),
+        new HashSet<>(manager.getOvershadowedChunks())
+    );
+    Assert.assertEquals(
+        "Mismatched standby chunks",
+        new HashSet<>(expectedStandbyChunks),
+        new HashSet<>(manager.getStandbyChunks())
+    );
+  }
+
+  private List<PartitionChunk<OvershadowableInteger>> newNonRootChunks(
+      int n,
+      int startPartitionId,
+      int endPartitionId,
+      int minorVersion,
+      int atomicUpdateGroupSize
+  )
+  {
+    return IntStream
+        .range(0, n)
+        .mapToObj(i -> newNonRootChunk(startPartitionId, endPartitionId, minorVersion, atomicUpdateGroupSize))
+        .collect(Collectors.toList());
+  }
+
+  private NumberedPartitionChunk<OvershadowableInteger> newRootChunk()
+  {
+    final int partitionId = nextRootPartitionId();
+    return new NumberedPartitionChunk<>(partitionId, 0, new OvershadowableInteger(MAJOR_VERSION, partitionId, 0));
+  }
+
+  private NumberedOverwritingPartitionChunk<OvershadowableInteger> newNonRootChunk(
+      int startRootPartitionId,
+      int endRootPartitionId,
+      int minorVersion,
+      int atomicUpdateGroupSize
+  )
+  {
+    final int partitionId = nextNonRootPartitionId();
+    return new NumberedOverwritingPartitionChunk<>(
+        partitionId,
+        new OvershadowableInteger(
+            MAJOR_VERSION,
+            partitionId,
+            0,
+            startRootPartitionId,
+            endRootPartitionId,
+            minorVersion,
+            atomicUpdateGroupSize
+        )
+    );
+  }
+
+  private int nextRootPartitionId()
+  {
+    return nextRootPartitionId++;
+  }
+
+  private int nextNonRootPartitionId()
+  {
+    return nextNonRootPartitionId++;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
index 335cd1e..e32a53c 100644
--- a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
+++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.timeline.Overshadowable;
 import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.ShardSpec;
 import org.joda.time.Interval;
 
 import java.io.Closeable;
@@ -70,9 +71,9 @@ public class ReferenceCountingSegment extends AbstractSegment implements Oversha
     }
   };
 
-  public ReferenceCountingSegment(Segment baseSegment)
+  public static ReferenceCountingSegment wrapRootGenerationSegment(Segment baseSegment)
   {
-    this(
+    return new ReferenceCountingSegment(
         Preconditions.checkNotNull(baseSegment, "baseSegment"),
         baseSegment.getId().getPartitionNum(),
         (baseSegment.getId().getPartitionNum() + 1),
@@ -81,7 +82,21 @@ public class ReferenceCountingSegment extends AbstractSegment implements Oversha
     );
   }
 
-  public ReferenceCountingSegment(
+  public static ReferenceCountingSegment wrapSegment(
+      Segment baseSegment,
+      ShardSpec shardSpec
+  )
+  {
+    return new ReferenceCountingSegment(
+        baseSegment,
+        shardSpec.getStartRootPartitionId(),
+        shardSpec.getEndRootPartitionId(),
+        shardSpec.getMinorVersion(),
+        shardSpec.getAtomicUpdateGroupSize()
+    );
+  }
+
+  private ReferenceCountingSegment(
       Segment baseSegment,
       int startRootPartitionId,
       int endRootPartitionId,
diff --git a/processing/src/test/java/org/apache/druid/query/select/MultiSegmentSelectQueryTest.java b/processing/src/test/java/org/apache/druid/query/select/MultiSegmentSelectQueryTest.java
index 216de77..07cc52f 100644
--- a/processing/src/test/java/org/apache/druid/query/select/MultiSegmentSelectQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/select/MultiSegmentSelectQueryTest.java
@@ -148,9 +148,21 @@ public class MultiSegmentSelectQueryTest
     segment_override = new IncrementalIndexSegment(index2, makeIdentifier(index2, "v2"));
 
     VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = new VersionedIntervalTimeline<>(StringComparators.LEXICOGRAPHIC);
-    timeline.add(index0.getInterval(), "v1", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment0)));
-    timeline.add(index1.getInterval(), "v1", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment1)));
-    timeline.add(index2.getInterval(), "v2", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment_override)));
+    timeline.add(
+        index0.getInterval(),
+        "v1",
+        new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment0))
+    );
+    timeline.add(
+        index1.getInterval(),
+        "v1",
+        new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment1))
+    );
+    timeline.add(
+        index2.getInterval(),
+        "v2",
+        new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment_override))
+    );
 
     segmentIdentifiers = new ArrayList<>();
     for (TimelineObjectHolder<String, ?> holder : timeline.lookup(Intervals.of("2011-01-12/2011-01-14"))) {
diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
index 325a477..e7b71e6 100644
--- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
@@ -145,8 +145,16 @@ public class TimeBoundaryQueryRunnerTest
     segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1, "v1"));
 
     VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = new VersionedIntervalTimeline<>(StringComparators.LEXICOGRAPHIC);
-    timeline.add(index0.getInterval(), "v1", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment0)));
-    timeline.add(index1.getInterval(), "v1", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment1)));
+    timeline.add(
+        index0.getInterval(),
+        "v1",
+        new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment0))
+    );
+    timeline.add(
+        index1.getInterval(),
+        "v1",
+        new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment1))
+    );
 
     return QueryRunnerTestHelper.makeFilteringQueryRunner(timeline, factory);
   }
diff --git a/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java b/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java
index 5a5ae7c..cee0a46 100644
--- a/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java
@@ -41,7 +41,7 @@ public class ReferenceCountingSegmentTest
   @Before
   public void setUp()
   {
-    segment = new ReferenceCountingSegment(
+    segment = ReferenceCountingSegment.wrapRootGenerationSegment(
         new AbstractSegment()
         {
           @Override
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
index 9be6d80..097d412 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
@@ -43,14 +43,16 @@ public class FireHydrant
   public FireHydrant(IncrementalIndex index, int count, SegmentId segmentId)
   {
     this.index = index;
-    this.adapter = new AtomicReference<>(new ReferenceCountingSegment(new IncrementalIndexSegment(index, segmentId)));
+    this.adapter = new AtomicReference<>(
+        ReferenceCountingSegment.wrapRootGenerationSegment(new IncrementalIndexSegment(index, segmentId))
+    );
     this.count = count;
   }
 
   public FireHydrant(Segment adapter, int count)
   {
     this.index = null;
-    this.adapter = new AtomicReference<>(new ReferenceCountingSegment(adapter));
+    this.adapter = new AtomicReference<>(ReferenceCountingSegment.wrapRootGenerationSegment(adapter));
     this.count = count;
   }
 
@@ -120,7 +122,7 @@ public class FireHydrant
         throw new ISE("Cannot swap to the same segment");
       }
       ReferenceCountingSegment newReferenceCountingSegment =
-          newSegment != null ? new ReferenceCountingSegment(newSegment) : null;
+          newSegment != null ? ReferenceCountingSegment.wrapRootGenerationSegment(newSegment) : null;
       if (adapter.compareAndSet(currentSegment, newReferenceCountingSegment)) {
         if (currentSegment != null) {
           currentSegment.close();
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index 73a9de1..04c84aa 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -172,11 +172,12 @@ public class SegmentManager
             log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId());
             resultSupplier.set(false);
           } else {
-            final ReferenceCountingSegment referenceCountingSegment = new ReferenceCountingSegment(adapter);
             loadedIntervals.add(
                 segment.getInterval(),
                 segment.getVersion(),
-                segment.getShardSpec().createChunk(referenceCountingSegment)
+                segment.getShardSpec().createChunk(
+                    ReferenceCountingSegment.wrapSegment(adapter, segment.getShardSpec())
+                )
             );
             dataSourceState.addSegment(segment);
             resultSupplier.set(true);
@@ -226,15 +227,7 @@ public class SegmentManager
                 segment.getVersion(),
                 // remove() internally searches for a partitionChunk to remove which is *equal* to the given
                 // partitionChunk. Note that partitionChunk.equals() checks only the partitionNum, but not the object.
-                segment.getShardSpec().createChunk(
-                    new ReferenceCountingSegment(
-                        null,
-                        shardSpec.getStartRootPartitionId(),
-                        shardSpec.getEndRootPartitionId(),
-                        shardSpec.getMinorVersion(),
-                        shardSpec.getAtomicUpdateGroupSize()
-                    )
-                )
+                segment.getShardSpec().createChunk(ReferenceCountingSegment.wrapSegment(null, shardSpec))
             );
             final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
 
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index 14cbdbd..465bb73 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -36,6 +36,8 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
+import org.apache.druid.timeline.partition.PartitionIds;
 import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
@@ -383,6 +385,34 @@ public class SegmentManagerTest
     Assert.assertNull(segmentManager.getTimeline("nonExisting"));
   }
 
+  @Test
+  public void testLoadAndDropNonRootGenerationSegment() throws SegmentLoadingException
+  {
+    final DataSegment segment = new DataSegment(
+        "small_source",
+        Intervals.of("0/1000"),
+        "0",
+        ImmutableMap.of("interval", Intervals.of("0/1000"), "version", 0),
+        new ArrayList<>(),
+        new ArrayList<>(),
+        new NumberedOverwriteShardSpec(
+            PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + 10,
+            10,
+            20,
+            (short) 1,
+            (short) 1
+        ),
+        0,
+        10
+    );
+
+    segmentManager.loadSegment(segment);
+    assertResult(ImmutableList.of(segment));
+
+    segmentManager.dropSegment(segment);
+    assertResult(ImmutableList.of());
+  }
+
   @SuppressWarnings("RedundantThrows") // TODO remove when the bug in intelliJ is fixed.
   private void assertResult(List<DataSegment> expectedExistingSegments) throws SegmentLoadingException
   {
@@ -403,7 +433,9 @@ public class SegmentManagerTest
       expectedTimeline.add(
           segment.getInterval(),
           segment.getVersion(),
-          segment.getShardSpec().createChunk(new ReferenceCountingSegment(segmentLoader.getSegment(segment)))
+          segment.getShardSpec().createChunk(
+              ReferenceCountingSegment.wrapSegment(segmentLoader.getSegment(segment), segment.getShardSpec())
+          )
       );
     }
 
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index 30858d6..155dc41 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -84,7 +84,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
     timeline.add(
         descriptor.getInterval(),
         descriptor.getVersion(),
-        descriptor.getShardSpec().createChunk(new ReferenceCountingSegment(segment))
+        descriptor.getShardSpec().createChunk(ReferenceCountingSegment.wrapSegment(segment, descriptor.getShardSpec()))
     );
     segments.add(descriptor);
     closeables.add(index);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org