You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/08/07 20:51:31 UTC
[incubator-druid] branch master updated: Fix bugs in
overshadowableManager and add unit tests (#8222)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8fa114c Fix bugs in overshadowableManager and add unit tests (#8222)
8fa114c is described below
commit 8fa114c34965e4ad19a3782b60f19ee2036ccb5a
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Wed Aug 7 13:51:21 2019 -0700
Fix bugs in overshadowableManager and add unit tests (#8222)
* Fix bugs in overshadowableManager and add unit tests
* Fix SegmentManager
* add segment manager test
* Address comments
* Address comments
---
.../timeline/partition/AtomicUpdateGroup.java | 6 +-
.../timeline/partition/OvershadowableManager.java | 647 ++++++++++--
.../druid/timeline/partition/PartitionHolder.java | 6 +-
.../timeline/VersionedIntervalTimelineTest.java | 114 +--
.../partition/IntegerPartitionChunkTest.java | 41 -
.../timeline/partition/OvershadowableInteger.java | 141 +++
.../partition/OvershadowableManagerTest.java | 1048 ++++++++++++++++++++
.../druid/segment/ReferenceCountingSegment.java | 21 +-
.../query/select/MultiSegmentSelectQueryTest.java | 18 +-
.../timeboundary/TimeBoundaryQueryRunnerTest.java | 12 +-
.../segment/ReferenceCountingSegmentTest.java | 2 +-
.../apache/druid/segment/realtime/FireHydrant.java | 8 +-
.../org/apache/druid/server/SegmentManager.java | 15 +-
.../apache/druid/server/SegmentManagerTest.java | 34 +-
.../util/SpecificSegmentsQuerySegmentWalker.java | 2 +-
15 files changed, 1832 insertions(+), 283 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/AtomicUpdateGroup.java b/core/src/main/java/org/apache/druid/timeline/partition/AtomicUpdateGroup.java
index aed7f15..1386c14 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/AtomicUpdateGroup.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/AtomicUpdateGroup.java
@@ -45,7 +45,7 @@ class AtomicUpdateGroup<T extends Overshadowable<T>> implements Overshadowable<A
// This may matter if there are a lot of segments to keep in memory as in brokers or the coordinator.
private final List<PartitionChunk<T>> chunks = new ArrayList<>();
- public AtomicUpdateGroup(PartitionChunk<T> chunk)
+ AtomicUpdateGroup(PartitionChunk<T> chunk)
{
this.chunks.add(chunk);
}
@@ -60,7 +60,7 @@ class AtomicUpdateGroup<T extends Overshadowable<T>> implements Overshadowable<A
}
for (PartitionChunk<T> existing : chunks) {
if (existing.equals(chunk)) {
- return;
+ throw new ISE("Can't add same chunk[%s] again", chunk);
}
}
chunks.add(chunk);
@@ -95,7 +95,7 @@ class AtomicUpdateGroup<T extends Overshadowable<T>> implements Overshadowable<A
}
@Nullable
- public PartitionChunk<T> findChunk(int partitionId)
+ PartitionChunk<T> findChunk(int partitionId)
{
return chunks.stream().filter(chunk -> chunk.getChunkNumber() == partitionId).findFirst().orElse(null);
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java b/core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
index 97e8b9a..d3f090f 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
@@ -19,6 +19,7 @@
package org.apache.druid.timeline.partition;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.objects.AbstractObjectCollection;
@@ -41,6 +42,7 @@ import org.apache.druid.timeline.Overshadowable;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -48,11 +50,13 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
/**
- * OvershadowableManager manages the state of {@link AtomicUpdateGroup}. See the below {@link State} for details of
- * the possible state.
+ * OvershadowableManager manages the state of {@link AtomicUpdateGroup}. See the below {@link State} for details.
* Note that an AtomicUpdateGroup can consist of {@link Overshadowable}s of the same majorVersion, minorVersion,
* rootPartition range, and atomicUpdateGroupSize.
* In {@link org.apache.druid.timeline.VersionedIntervalTimeline}, this class is used to manage segments in the same
@@ -62,11 +66,22 @@ import java.util.TreeMap;
*/
class OvershadowableManager<T extends Overshadowable<T>>
{
- private enum State
+ /**
+ * There are 3 states for atomicUpdateGroups.
+ * There could be at most one visible atomicUpdateGroup at any time in a non-empty overshadowableManager.
+ *
+ * - Visible: fully available atomicUpdateGroup of the highest version if any.
+ * If there's no fully available atomicUpdateGroup, the standby atomicUpdateGroup of the highest version
+ * becomes visible.
+ * - Standby: all atomicUpdateGroups of higher versions than that of the visible atomicUpdateGroup.
+ * - Overshadowed: all atomicUpdateGroups of lower versions than that of the visible atomicUpdateGroup.
+ */
+ @VisibleForTesting
+ enum State
{
- STANDBY, // have atomicUpdateGroup of higher versions than visible
- VISIBLE, // have a single fully available atomicUpdateGroup of highest version
- OVERSHADOWED // have atomicUpdateGroup of lower versions than visible
+ STANDBY,
+ VISIBLE,
+ OVERSHADOWED
}
private final Map<Integer, PartitionChunk<T>> knownPartitionChunks; // served segments
@@ -92,6 +107,16 @@ class OvershadowableManager<T extends Overshadowable<T>>
this.overshadowedGroups = new TreeMap<>(other.overshadowedGroups);
}
+ private OvershadowableManager(List<AtomicUpdateGroup<T>> groups)
+ {
+ this();
+ for (AtomicUpdateGroup<T> entry : groups) {
+ for (PartitionChunk<T> chunk : entry.getChunks()) {
+ addChunk(chunk);
+ }
+ }
+ }
+
private TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> getStateMap(State state)
{
switch (state) {
@@ -125,7 +150,39 @@ class OvershadowableManager<T extends Overshadowable<T>>
Preconditions.checkArgument(!atomicUpdateGroup.isEmpty(), "empty atomicUpdateGroup");
removeFrom(atomicUpdateGroup, from);
- addAtomicUpdateGroupWithState(atomicUpdateGroup, to);
+ addAtomicUpdateGroupWithState(atomicUpdateGroup, to, false);
+ }
+
+ /**
+ * Replace the oldVisibleGroups with the newVisibleGroups.
+ * This method first removes the oldVisibleGroups from the visibles map,
+ * moves the newVisibleGroups from its old state map to the visibles map,
+ * and finally add the oldVisibleGroups to its new state map.
+ */
+ private void replaceVisibleWith(
+ Collection<AtomicUpdateGroup<T>> oldVisibleGroups,
+ State newStateOfOldVisibleGroup,
+ List<AtomicUpdateGroup<T>> newVisibleGroups,
+ State oldStateOfNewVisibleGroups
+ )
+ {
+ oldVisibleGroups.forEach(
+ group -> {
+ if (!group.isEmpty()) {
+ removeFrom(group, State.VISIBLE);
+ }
+ }
+ );
+ newVisibleGroups.forEach(
+ entry -> transitAtomicUpdateGroupState(entry, oldStateOfNewVisibleGroups, State.VISIBLE)
+ );
+ oldVisibleGroups.forEach(
+ group -> {
+ if (!group.isEmpty()) {
+ addAtomicUpdateGroupWithState(group, newStateOfOldVisibleGroup, false);
+ }
+ }
+ );
}
/**
@@ -179,7 +236,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
return null;
}
- private List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> findOvershadowedBy(
+ private List<AtomicUpdateGroup<T>> findOvershadowedBy(
AtomicUpdateGroup<T> aug,
State fromState
)
@@ -189,89 +246,372 @@ class OvershadowableManager<T extends Overshadowable<T>>
}
/**
- * Find all atomicUpdateGroups of the given state overshadowed by the given rootPartitionRange and minorVersion.
+ * Find all atomicUpdateGroups of the given state overshadowed by the minorVersion in the given rootPartitionRange.
* The atomicUpdateGroup of a higher minorVersion can have a wider RootPartitionRange.
* To find all atomicUpdateGroups overshadowed by the given rootPartitionRange and minorVersion,
* we first need to find the first key contained by the given rootPartitionRange.
* Once we find such key, then we go through the entire map until we see an atomicUpdateGroup of which
* rootRangePartition is not contained by the given rootPartitionRange.
+ *
+ * @param rangeOfAug the partition range to search for overshadowed groups.
+ * @param minorVersion the minor version to check overshadow relation. The found groups will have lower minor versions
+ * than this.
+ * @param fromState the state to search for overshadowed groups.
+ *
+ * @return a list of found atomicUpdateGroups. It could be empty if no groups are found.
+ */
+ @VisibleForTesting
+ List<AtomicUpdateGroup<T>> findOvershadowedBy(RootPartitionRange rangeOfAug, short minorVersion, State fromState)
+ {
+ final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
+ Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = findLowestOverlappingEntry(
+ rangeOfAug,
+ stateMap,
+ true
+ );
+
+ if (current == null) {
+ return Collections.emptyList();
+ }
+
+ // Going through the map to find all entries of the RootPartitionRange contained by the given rangeOfAug.
+ // Note that RootPartitionRange of entries are always consecutive.
+ final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
+ while (current != null && rangeOfAug.overlaps(current.getKey())) {
+ if (rangeOfAug.contains(current.getKey())) {
+ // versionToGroup is sorted by minorVersion.
+ // versionToGroup.headMap(minorVersion) below returns a map containing all entries of lower minorVersions
+ // than the given minorVersion.
+ final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
+ // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized.
+ // Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called.
+ // See AbstractObjectCollection.toArray().
+ // If you see performance degradation here, probably we need to improve the below line.
+ if (versionToGroup.firstShortKey() < minorVersion) {
+ found.addAll(versionToGroup.headMap(minorVersion).values());
+ }
+ }
+ current = stateMap.higherEntry(current.getKey());
+ }
+ return found;
+ }
+
+ private List<AtomicUpdateGroup<T>> findOvershadows(AtomicUpdateGroup<T> aug, State fromState)
+ {
+ return findOvershadows(RootPartitionRange.of(aug), aug.getMinorVersion(), fromState);
+ }
+
+ /**
+ * Find all atomicUpdateGroups which overshadow others of the given minorVersion in the given rootPartitionRange.
+ * Similar to {@link #findOvershadowedBy}.
+ *
+ * Note that one atomicUpdateGroup can overshadow multiple other groups. If you're finding overshadowing
+ * atomicUpdateGroups by calling this method in a loop, the results of this method can contain duplicate groups.
+ *
+ * @param rangeOfAug the partition range to search for overshadowing groups.
+ * @param minorVersion the minor version to check overshadow relation. The found groups will have higher minor
+ * versions than this.
+ * @param fromState the state to search for overshadowed groups.
+ *
+ * @return a list of found atomicUpdateGroups. It could be empty if no groups are found.
+ */
+ @VisibleForTesting
+ List<AtomicUpdateGroup<T>> findOvershadows(RootPartitionRange rangeOfAug, short minorVersion, State fromState)
+ {
+ final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
+ Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = findLowestOverlappingEntry(
+ rangeOfAug,
+ stateMap,
+ false
+ );
+
+ if (current == null) {
+ return Collections.emptyList();
+ }
+
+ // Going through the map to find all entries of the RootPartitionRange contains the given rangeOfAug.
+ // Note that RootPartitionRange of entries are always consecutive.
+ final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
+ while (current != null && current.getKey().overlaps(rangeOfAug)) {
+ if (current.getKey().contains(rangeOfAug)) {
+ // versionToGroup is sorted by minorVersion.
+ // versionToGroup.tailMap(minorVersion) below returns a map containing all entries of equal to or higher
+ // minorVersions than the given minorVersion.
+ final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
+ // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized.
+ // Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called.
+ // See AbstractObjectCollection.toArray().
+ // If you see performance degradation here, probably we need to improve the below line.
+ if (versionToGroup.lastShortKey() > minorVersion) {
+ found.addAll(versionToGroup.tailMap(minorVersion).values());
+ }
+ }
+ current = stateMap.higherEntry(current.getKey());
+ }
+ return found;
+ }
+
+ /**
+ * Finds the lowest entry overlapping with the given root partition range.
+ * It first searches the entries lower than or equal to the given range.
+ * If there's no such entry lower than the given range, then it searches the entries higher than the given range.
+ *
+ * @return an entry of the lowest key overlapping with the given range. Otherwise null.
*/
- private List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> findOvershadowedBy(
+ @Nullable
+ private Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> findLowestOverlappingEntry(
RootPartitionRange rangeOfAug,
- short minorVersion,
- State fromState
+ TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap,
+ boolean strictSameStartId
)
{
- final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
+ // Searches the entries lower than or equal to the given range.
Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = stateMap.floorEntry(rangeOfAug);
if (current == null) {
- return Collections.emptyList();
+ // Searches the entries higher than then given range.
+ current = stateMap.higherEntry(rangeOfAug);
+ }
+
+ if (current == null) {
+ return null;
+ }
+
+ // floorEntry() can return the greatest key less than rangeOfAug. We need to skip non-overlapping keys.
+ while (current != null && !current.getKey().overlaps(rangeOfAug)) {
+ current = stateMap.higherEntry(current.getKey());
+ }
+
+ final BiPredicate<RootPartitionRange, RootPartitionRange> predicate;
+ if (strictSameStartId) {
+ predicate = (entryRange, groupRange) -> entryRange.startPartitionId == groupRange.startPartitionId;
+ } else {
+ predicate = RootPartitionRange::overlaps;
}
- // Find the first key for searching for overshadowed atomicUpdateGroup
- while (true) {
+ // There could be multiple entries of the same startPartitionId but different endPartitionId.
+ // Find the first key of the same startPartitionId which has the lowest endPartitionId.
+ while (current != null) {
final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> lowerEntry = stateMap.lowerEntry(
current.getKey()
);
- if (lowerEntry != null && lowerEntry.getKey().startPartitionId == rangeOfAug.startPartitionId) {
+ if (lowerEntry != null && predicate.test(lowerEntry.getKey(), rangeOfAug)) {
current = lowerEntry;
} else {
break;
}
}
- // Going through the map to find all entries of the RootPartitionRange contained by the given rangeOfAug.
- // Note that RootPartitionRange of entries are always consecutive.
- final List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> found = new ArrayList<>();
- while (current != null && rangeOfAug.contains(current.getKey())) {
- // versionToGroup is sorted by minorVersion.
- // versionToGroup.subMap(firstKey, minorVersion) below returns a map containing all entries of lower minorVersions
- // than the given minorVersion.
- final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
- // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized.
- // Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called.
- // See AbstractObjectCollection.toArray().
- // If you see performance degradation here, probably we need to improve the below line.
- found.addAll(versionToGroup.subMap(versionToGroup.firstShortKey(), minorVersion).short2ObjectEntrySet());
- current = stateMap.higherEntry(current.getKey());
- }
- return found;
+ return current;
}
/**
- * Handles addition of the atomicUpdateGroup to the given state
+ * Determine the visible group after a new chunk is added.
*/
- private void transitionStandbyGroupIfFull(AtomicUpdateGroup<T> aug, State stateOfAug)
+ private void determineVisibleGroupAfterAdd(AtomicUpdateGroup<T> aug, State stateOfAug)
{
if (stateOfAug == State.STANDBY) {
- // A standby atomicUpdateGroup becomes visible when its all segments are available.
- if (aug.isFull()) {
- // A visible atomicUpdateGroup becomes overshadowed when a fully available standby atomicUpdateGroup becomes
- // visible which overshadows the current visible one.
- findOvershadowedBy(aug, State.VISIBLE)
- .forEach(entry -> transitAtomicUpdateGroupState(entry.getValue(), State.VISIBLE, State.OVERSHADOWED));
- findOvershadowedBy(aug, State.STANDBY)
- .forEach(entry -> transitAtomicUpdateGroupState(entry.getValue(), State.STANDBY, State.OVERSHADOWED));
- transitAtomicUpdateGroupState(aug, State.STANDBY, State.VISIBLE);
+ moveNewStandbyToVisibleIfNecessary(aug, stateOfAug);
+ } else if (stateOfAug == State.OVERSHADOWED) {
+ checkVisibleIsFullyAvailableAndTryToMoveOvershadowedToVisible(aug, stateOfAug);
+ }
+ }
+
+ /**
+ * This method is called in {@link #determineVisibleGroupAfterAdd}.
+ * The given standby group can be visible in the below two cases:
+ *
+ * - The standby group is full. Since every standby group has a higher version than the current visible group,
+ * it should become visible immediately when it's full.
+ * - The standby group is not full but not empty and the current visible is not full. If there's no fully available
+ * group, the group of the highest version should be the visible.
+ */
+ private void moveNewStandbyToVisibleIfNecessary(AtomicUpdateGroup<T> standbyGroup, State stateOfGroup)
+ {
+ assert stateOfGroup == State.STANDBY;
+
+ // A standby atomicUpdateGroup becomes visible when its all segments are available.
+ if (standbyGroup.isFull()) {
+ // A current visible atomicUpdateGroup becomes overshadowed when a fully available standby atomicUpdateGroup
+ // becomes visible.
+ replaceVisibleWith(
+ findOvershadowedBy(standbyGroup, State.VISIBLE),
+ State.OVERSHADOWED,
+ Collections.singletonList(standbyGroup),
+ State.STANDBY
+ );
+ findOvershadowedBy(standbyGroup, State.STANDBY)
+ .forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED));
+ } else {
+ // The given atomicUpdateGroup is in the standby state which means it's not overshadowed by the visible group.
+ // If the visible group is not fully available, then the new standby group should be visible since it has a
+ // higher minor version.
+ if (!standbyGroup.isEmpty()) {
+ // Check there are visible atomicUpdateGroups overshadowed by the given atomicUpdateGroup.
+ final List<AtomicUpdateGroup<T>> overshadowedVisibles = findOvershadowedBy(
+ standbyGroup,
+ State.VISIBLE
+ );
+ if (overshadowedVisibles.isEmpty()) {
+ // There is no visible atomicUpdateGroup for the rootPartitionRange of the given aug.
+ // The given aug should be visible.
+ transitAtomicUpdateGroupState(standbyGroup, State.STANDBY, State.VISIBLE);
+ findOvershadowedBy(standbyGroup, State.STANDBY)
+ .forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED));
+ } else {
+ // Check there is any missing chunk in the current visible groups.
+ // If the current visible groups don't cover the partitino range of the given standby group,
+ // the given standby group should be visible.
+ final boolean fullyCoverAugRange = doGroupsFullyCoverPartitionRange(
+ overshadowedVisibles,
+ standbyGroup.getStartRootPartitionId(),
+ standbyGroup.getEndRootPartitionId()
+ );
+ if (!fullyCoverAugRange) {
+ replaceVisibleWith(
+ overshadowedVisibles,
+ State.OVERSHADOWED,
+ Collections.singletonList(standbyGroup),
+ State.STANDBY
+ );
+ findOvershadowedBy(standbyGroup, State.STANDBY)
+ .forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED));
+
+ }
+ // If all visible atomicUpdateGroups are full, then the given atomicUpdateGroup should stay in the standby
+ // state.
+ }
+ }
+ }
+ }
+
+ /**
+ * This method is called in {@link #determineVisibleGroupAfterAdd}. It first checks the current visible group is
+ * fully available. If not, it checks there are overshadowed groups which can cover the rootPartitionRange of
+ * the visible groups and are fully available. If it finds such groups, they become visible.
+ */
+ private void checkVisibleIsFullyAvailableAndTryToMoveOvershadowedToVisible(
+ AtomicUpdateGroup<T> group,
+ State stateOfGroup
+ )
+ {
+ assert stateOfGroup == State.OVERSHADOWED;
+ if (group.isFull()) {
+ // Since this atomicUpdateGroup is full, it could be changed to visible if the current visible group is not
+ // fully available. To check this, we first check the current visible is fully available.
+ // And if not, we check the overshadowed groups are fully available and can cover the partition range of
+ // the atomicUpdateGroups overshadow the given overshadowed group.
+
+ // Visible or standby groups overshadowing the given group.
+ // Used to both 1) check fully available visible group and
+ // 2) get the partition range which the fully available overshadowed groups should cover to become visible.
+ final List<AtomicUpdateGroup<T>> groupsOvershadowingAug;
+ final boolean isOvershadowingGroupsFull;
+
+ final List<AtomicUpdateGroup<T>> overshadowingVisibles = findOvershadows(group, State.VISIBLE);
+ if (overshadowingVisibles.isEmpty()) {
+ final List<AtomicUpdateGroup<T>> overshadowingStandbys = findLatestNonFullyAvailableAtomicUpdateGroups(
+ findOvershadows(group, State.STANDBY)
+ );
+ if (overshadowingStandbys.isEmpty()) {
+ throw new ISE("WTH? atomicUpdateGroup[%s] is in overshadowed state, but no one overshadows it?", group);
+ }
+ groupsOvershadowingAug = overshadowingStandbys;
+ isOvershadowingGroupsFull = false;
+ } else {
+ groupsOvershadowingAug = overshadowingVisibles;
+ isOvershadowingGroupsFull = doGroupsFullyCoverPartitionRange(
+ groupsOvershadowingAug,
+ groupsOvershadowingAug.get(0).getStartRootPartitionId(),
+ groupsOvershadowingAug.get(groupsOvershadowingAug.size() - 1).getEndRootPartitionId()
+ );
+ }
+
+ // If groupsOvershadowingAug is the standby groups, isOvershadowingGroupsFull is always false.
+ // If groupsOvershadowingAug is the visible groups, isOvershadowingGroupsFull indicates the visible group is
+ // fully available or not.
+ if (!isOvershadowingGroupsFull) {
+ // Let's check the overshadowed groups can cover the partition range of groupsOvershadowingAug
+ // and are fully available.
+ final List<AtomicUpdateGroup<T>> latestFullGroups = groupsOvershadowingAug
+ .stream()
+ .flatMap(eachFullgroup -> findLatestFullyAvailableOvershadowedAtomicUpdateGroups(
+ RootPartitionRange.of(eachFullgroup),
+ eachFullgroup.getMinorVersion()).stream()
+ )
+ .collect(Collectors.toList());
+
+ if (!latestFullGroups.isEmpty()) {
+ final boolean isOvershadowedGroupsFull = doGroupsFullyCoverPartitionRange(
+ latestFullGroups,
+ groupsOvershadowingAug.get(0).getStartRootPartitionId(),
+ groupsOvershadowingAug.get(groupsOvershadowingAug.size() - 1).getEndRootPartitionId()
+ );
+
+ if (isOvershadowedGroupsFull) {
+ replaceVisibleWith(overshadowingVisibles, State.STANDBY, latestFullGroups, State.OVERSHADOWED);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Checks if the given groups fully cover the given partition range. To fully cover the range, the given groups
+ * should satisfy the below:
+ *
+ * - All groups must be full.
+ * - All groups must be adjacent.
+ * - The lowest startPartitionId and the highest endPartitionId must be same with the given startPartitionId and
+ * the given endPartitionId, respectively.
+ *
+ * @param groups atomicUpdateGroups sorted by their rootPartitionRange
+ * @param startRootPartitionId the start partitionId of the root partition range to check the coverage
+ * @param endRootPartitionId the end partitionId of the root partition range to check the coverage
+ *
+ * @return true if the given groups fully cover the given partition range.
+ */
+ private boolean doGroupsFullyCoverPartitionRange(
+ List<AtomicUpdateGroup<T>> groups,
+ int startRootPartitionId,
+ int endRootPartitionId
+ )
+ {
+ final int startRootPartitionIdOfOvershadowed = groups.get(0).getStartRootPartitionId();
+ final int endRootPartitionIdOfOvershadowed = groups.get(groups.size() - 1).getEndRootPartitionId();
+ if (startRootPartitionId != startRootPartitionIdOfOvershadowed
+ || endRootPartitionId != endRootPartitionIdOfOvershadowed) {
+ return false;
+ } else {
+ int prevEndPartitionId = groups.get(0).getStartRootPartitionId();
+ for (AtomicUpdateGroup<T> group : groups) {
+ if (!group.isFull() || prevEndPartitionId != group.getStartRootPartitionId()) {
+ // If any visible atomicUpdateGroup overshadowed by the given standby atomicUpdateGroup is not full,
+ // then the given atomicUpdateGroup should be visible since it has a higher version.
+ return false;
+ }
+ prevEndPartitionId = group.getEndRootPartitionId();
}
}
+ return true;
}
- private void addAtomicUpdateGroupWithState(AtomicUpdateGroup<T> aug, State state)
+ private void addAtomicUpdateGroupWithState(AtomicUpdateGroup<T> aug, State state, boolean determineVisible)
{
final AtomicUpdateGroup<T> existing = getStateMap(state)
.computeIfAbsent(RootPartitionRange.of(aug), k -> createMinorVersionToAugMap(state))
.put(aug.getMinorVersion(), aug);
if (existing != null) {
- throw new ISE("AtomicUpdateGroup[%s] is already in state[%s]", aug, state);
+ throw new ISE("AtomicUpdateGroup[%s] is already in state[%s]", existing, state);
}
- transitionStandbyGroupIfFull(aug, state);
+ if (determineVisible) {
+ determineVisibleGroupAfterAdd(aug, state);
+ }
}
- public boolean addChunk(PartitionChunk<T> chunk)
+ boolean addChunk(PartitionChunk<T> chunk)
{
// Sanity check. ExistingChunk should be usually null.
final PartitionChunk<T> existingChunk = knownPartitionChunks.put(chunk.getChunkNumber(), chunk);
@@ -295,17 +635,36 @@ class OvershadowableManager<T extends Overshadowable<T>>
if (atomicUpdateGroup != null) {
atomicUpdateGroup.add(chunk);
+ // If overshadowed atomicUpdateGroup is full and visible atomicUpdateGroup is not full,
+ // move overshadowed one to visible.
+ determineVisibleGroupAfterAdd(atomicUpdateGroup, State.OVERSHADOWED);
} else {
atomicUpdateGroup = findAtomicUpdateGroupWith(chunk, State.STANDBY);
if (atomicUpdateGroup != null) {
atomicUpdateGroup.add(chunk);
- transitionStandbyGroupIfFull(atomicUpdateGroup, State.STANDBY);
+ determineVisibleGroupAfterAdd(atomicUpdateGroup, State.STANDBY);
} else {
atomicUpdateGroup = findAtomicUpdateGroupWith(chunk, State.VISIBLE);
if (atomicUpdateGroup != null) {
- atomicUpdateGroup.add(chunk);
+ if (atomicUpdateGroup.findChunk(chunk.getChunkNumber()) == null) {
+ // If this chunk is not in the atomicUpdateGroup, then we add the chunk to it if it's not full.
+ if (!atomicUpdateGroup.isFull()) {
+ atomicUpdateGroup.add(chunk);
+ } else {
+ throw new ISE("Can't add chunk[%s] to a full atomicUpdateGroup[%s]", chunk, atomicUpdateGroup);
+ }
+ } else {
+ // If this chunk is already in the atomicUpdateGroup, it should be in knownPartitionChunks
+ // and this code must not be executed.
+ throw new ISE(
+ "WTH? chunk[%s] is in the atomicUpdateGroup[%s] but not in knownPartitionChunks[%s]?",
+ chunk,
+ atomicUpdateGroup,
+ knownPartitionChunks
+ );
+ }
} else {
final AtomicUpdateGroup<T> newAtomicUpdateGroup = new AtomicUpdateGroup<>(chunk);
@@ -317,9 +676,9 @@ class OvershadowableManager<T extends Overshadowable<T>>
.anyMatch(group -> group.overshadows(newAtomicUpdateGroup));
if (overshadowed) {
- addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.OVERSHADOWED);
+ addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.OVERSHADOWED, true);
} else {
- addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.STANDBY);
+ addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.STANDBY, true);
}
}
}
@@ -328,7 +687,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
}
/**
- * Handles of removal of an empty atomicUpdateGroup from a state.
+ * Handles the removal of an empty atomicUpdateGroup from a state.
*/
private void determineVisibleGroupAfterRemove(
AtomicUpdateGroup<T> augOfRemovedChunk,
@@ -343,33 +702,101 @@ class OvershadowableManager<T extends Overshadowable<T>>
// is removed.
if (stateOfRemovedAug == State.VISIBLE) {
- // All segments in the visible atomicUpdateGroup which overshadows this atomicUpdateGroup is removed.
- // Fall back if there is a fully available overshadowed atomicUpdateGroup
-
- final List<AtomicUpdateGroup<T>> latestFullAugs = findLatestFullyAvailableOvershadowedAtomicUpdateGroup(
+ // A chunk is removed from the current visible group.
+ // Fall back to
+ // 1) the latest fully available overshadowed group if any
+ // 2) the latest standby group if any
+ // 3) the latest overshadowed group if any
+
+ // Check there is a fully available latest overshadowed atomicUpdateGroup.
+ final List<AtomicUpdateGroup<T>> latestFullAugs = findLatestFullyAvailableOvershadowedAtomicUpdateGroups(
rangeOfAug,
minorVersion
);
- // If there is no fully available fallback group, then the existing VISIBLE group remains VISIBLE.
- // Otherwise, the latest fully available group becomes VISIBLE.
+ // If there are fully available overshadowed groups, then the latest one becomes visible.
if (!latestFullAugs.isEmpty()) {
- // Move the atomicUpdateGroup to standby
- // and move the fully available overshadowed atomicUpdateGroup to visible
- if (!augOfRemovedChunk.isEmpty()) {
- transitAtomicUpdateGroupState(augOfRemovedChunk, State.VISIBLE, State.STANDBY);
+ // The current visible atomicUpdateGroup becomes standby
+ // and the fully available overshadowed atomicUpdateGroups become visible
+ final Set<AtomicUpdateGroup<T>> overshadowsLatestFullAugsInVisible = latestFullAugs
+ .stream()
+ .flatMap(group -> findOvershadows(group, State.VISIBLE).stream())
+ .collect(Collectors.toSet());
+ replaceVisibleWith(
+ overshadowsLatestFullAugsInVisible,
+ State.STANDBY,
+ latestFullAugs,
+ State.OVERSHADOWED
+ );
+ latestFullAugs
+ .stream()
+ .flatMap(group -> findOvershadows(group, State.OVERSHADOWED).stream())
+ .collect(Collectors.toSet())
+ .forEach(group -> transitAtomicUpdateGroupState(group, State.OVERSHADOWED, State.STANDBY));
+ } else {
+ // Find the latest non-fully available atomicUpdateGroups
+ final List<AtomicUpdateGroup<T>> latestStandby = findLatestNonFullyAvailableAtomicUpdateGroups(
+ findOvershadows(rangeOfAug, minorVersion, State.STANDBY)
+ );
+ if (!latestStandby.isEmpty()) {
+ final List<AtomicUpdateGroup<T>> overshadowedByLatestStandby = latestStandby
+ .stream()
+ .flatMap(group -> findOvershadowedBy(group, State.VISIBLE).stream())
+ .collect(Collectors.toList());
+ replaceVisibleWith(overshadowedByLatestStandby, State.OVERSHADOWED, latestStandby, State.STANDBY);
+
+ // All standby groups overshadowed by the new visible group should be moved to overshadowed
+ latestStandby
+ .stream()
+ .flatMap(group -> findOvershadowedBy(group, State.STANDBY).stream())
+ .collect(Collectors.toSet())
+ .forEach(aug -> transitAtomicUpdateGroupState(aug, State.STANDBY, State.OVERSHADOWED));
+ } else if (augOfRemovedChunk.isEmpty()) {
+ // Visible is empty. Move the latest overshadowed to visible.
+ final List<AtomicUpdateGroup<T>> latestOvershadowed = findLatestNonFullyAvailableAtomicUpdateGroups(
+ findOvershadowedBy(rangeOfAug, minorVersion, State.OVERSHADOWED)
+ );
+ if (!latestOvershadowed.isEmpty()) {
+ latestOvershadowed.forEach(aug -> transitAtomicUpdateGroupState(aug, State.OVERSHADOWED, State.VISIBLE));
+ }
}
- latestFullAugs.forEach(group -> transitAtomicUpdateGroupState(group, State.OVERSHADOWED, State.VISIBLE));
}
}
}
- private List<AtomicUpdateGroup<T>> findLatestFullyAvailableOvershadowedAtomicUpdateGroup(
+ /**
+ * Find the latest NON-FULLY available atomicUpdateGroups from the given groups.
+ *
+ * This method MUST be called only when there is no fully available ones in the given groups. If the given groups
+ * are in the overshadowed state, calls {@link #findLatestFullyAvailableOvershadowedAtomicUpdateGroups} first
+ * to check there is any fully available group.
+ * If the given groups are in the standby state, you can freely call this method because there should be no fully
+ * available one in the standby groups at any time.
+ */
+ private List<AtomicUpdateGroup<T>> findLatestNonFullyAvailableAtomicUpdateGroups(List<AtomicUpdateGroup<T>> groups)
+ {
+ if (groups.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final OvershadowableManager<T> manager = new OvershadowableManager<>(groups);
+ if (!manager.standbyGroups.isEmpty()) {
+ throw new ISE("This method should be called only when there is no fully available group in the given state.");
+ }
+
+ final List<AtomicUpdateGroup<T>> visibles = new ArrayList<>();
+ for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> map : manager.visibleGroup.values()) {
+ visibles.addAll(map.values());
+ }
+ return visibles;
+ }
+
+ private List<AtomicUpdateGroup<T>> findLatestFullyAvailableOvershadowedAtomicUpdateGroups(
RootPartitionRange rangeOfAug,
short minorVersion
)
{
- final List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> overshadowedGroups = findOvershadowedBy(
+ final List<AtomicUpdateGroup<T>> overshadowedGroups = findOvershadowedBy(
rangeOfAug,
minorVersion,
State.OVERSHADOWED
@@ -378,16 +805,22 @@ class OvershadowableManager<T extends Overshadowable<T>>
return Collections.emptyList();
}
- final OvershadowableManager<T> manager = new OvershadowableManager<>();
- for (Short2ObjectMap.Entry<AtomicUpdateGroup<T>> entry : overshadowedGroups) {
- for (PartitionChunk<T> chunk : entry.getValue().getChunks()) {
- manager.addChunk(chunk);
- }
- }
-
+ final OvershadowableManager<T> manager = new OvershadowableManager<>(overshadowedGroups);
final List<AtomicUpdateGroup<T>> visibles = new ArrayList<>();
for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> map : manager.visibleGroup.values()) {
- visibles.addAll(map.values());
+ for (AtomicUpdateGroup<T> atomicUpdateGroup : map.values()) {
+ if (!atomicUpdateGroup.isFull()) {
+ return Collections.emptyList();
+ }
+ visibles.add(atomicUpdateGroup);
+ }
+ }
+ final RootPartitionRange foundRange = RootPartitionRange.of(
+ visibles.get(0).getStartRootPartitionId(),
+ visibles.get(visibles.size() - 1).getEndRootPartitionId()
+ );
+ if (!rangeOfAug.equals(foundRange)) {
+ return Collections.emptyList();
}
return visibles;
}
@@ -419,7 +852,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
}
@Nullable
- public PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk)
+ PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk)
{
final PartitionChunk<T> knownChunk = knownPartitionChunks.get(partitionChunk.getChunkNumber());
if (knownChunk == null) {
@@ -461,7 +894,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
}
@Nullable
- public PartitionChunk<T> getChunk(int partitionId)
+ PartitionChunk<T> getChunk(int partitionId)
{
final PartitionChunk<T> chunk = knownPartitionChunks.get(partitionId);
if (chunk == null) {
@@ -480,26 +913,33 @@ class OvershadowableManager<T extends Overshadowable<T>>
}
}
- public List<PartitionChunk<T>> getVisibles()
+ List<PartitionChunk<T>> getVisibleChunks()
{
- final List<PartitionChunk<T>> visibles = new ArrayList<>();
- for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> treeMap : visibleGroup.values()) {
- for (AtomicUpdateGroup<T> aug : treeMap.values()) {
- visibles.addAll(aug.getChunks());
- }
- }
- return visibles;
+ return getAllChunks(visibleGroup);
+ }
+
+ List<PartitionChunk<T>> getOvershadowedChunks()
+ {
+ return getAllChunks(overshadowedGroups);
+ }
+
+ @VisibleForTesting
+ List<PartitionChunk<T>> getStandbyChunks()
+ {
+ return getAllChunks(standbyGroups);
}
- public List<PartitionChunk<T>> getOvershadowed()
+ private List<PartitionChunk<T>> getAllChunks(
+ TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap
+ )
{
- final List<PartitionChunk<T>> overshadowed = new ArrayList<>();
- for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> treeMap : overshadowedGroups.values()) {
+ final List<PartitionChunk<T>> allChunks = new ArrayList<>();
+ for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> treeMap : stateMap.values()) {
for (AtomicUpdateGroup<T> aug : treeMap.values()) {
- overshadowed.addAll(aug.getChunks());
+ allChunks.addAll(aug.getChunks());
}
}
- return overshadowed;
+ return allChunks;
}
@Override
@@ -535,11 +975,18 @@ class OvershadowableManager<T extends Overshadowable<T>>
'}';
}
- private static class RootPartitionRange implements Comparable<RootPartitionRange>
+ @VisibleForTesting
+ static class RootPartitionRange implements Comparable<RootPartitionRange>
{
private final short startPartitionId;
private final short endPartitionId;
+ @VisibleForTesting
+ static RootPartitionRange of(int startPartitionId, int endPartitionId)
+ {
+ return new RootPartitionRange((short) startPartitionId, (short) endPartitionId);
+ }
+
private static <T extends Overshadowable<T>> RootPartitionRange of(PartitionChunk<T> chunk)
{
return of(chunk.getObject().getStartRootPartitionId(), chunk.getObject().getEndRootPartitionId());
@@ -550,11 +997,6 @@ class OvershadowableManager<T extends Overshadowable<T>>
return of(aug.getStartRootPartitionId(), aug.getEndRootPartitionId());
}
- private static RootPartitionRange of(int startPartitionId, int endPartitionId)
- {
- return new RootPartitionRange((short) startPartitionId, (short) endPartitionId);
- }
-
private RootPartitionRange(short startPartitionId, short endPartitionId)
{
this.startPartitionId = startPartitionId;
@@ -563,7 +1005,16 @@ class OvershadowableManager<T extends Overshadowable<T>>
public boolean contains(RootPartitionRange that)
{
- return this.startPartitionId <= that.startPartitionId && this.endPartitionId >= that.endPartitionId;
+ return Short.toUnsignedInt(startPartitionId) <= Short.toUnsignedInt(that.startPartitionId)
+ && Short.toUnsignedInt(this.endPartitionId) >= Short.toUnsignedInt(that.endPartitionId);
+ }
+
+ public boolean overlaps(RootPartitionRange that)
+ {
+ return Short.toUnsignedInt(startPartitionId) <= Short.toUnsignedInt(that.startPartitionId)
+ && Short.toUnsignedInt(endPartitionId) > Short.toUnsignedInt(that.startPartitionId)
+ || Short.toUnsignedInt(startPartitionId) >= Short.toUnsignedInt(that.startPartitionId)
+ && Short.toUnsignedInt(startPartitionId) < Short.toUnsignedInt(that.endPartitionId);
}
@Override
@@ -638,7 +1089,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
if (toKey > key) {
return this;
} else {
- throw new IllegalArgumentException();
+ throw new IAE("toKey: %s, key: %s", toKey, key);
}
}
@@ -648,7 +1099,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
if (fromKey <= key) {
return this;
} else {
- throw new IllegalArgumentException();
+ throw new IAE("fromKey: %s, key: %s", fromKey, key);
}
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java b/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
index 26e34da..afa8460 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
@@ -113,13 +113,13 @@ public class PartitionHolder<T extends Overshadowable<T>> implements Iterable<Pa
@Override
public Iterator<PartitionChunk<T>> iterator()
{
- return overshadowableManager.getVisibles().iterator();
+ return overshadowableManager.getVisibleChunks().iterator();
}
@Override
public Spliterator<PartitionChunk<T>> spliterator()
{
- return overshadowableManager.getVisibles().spliterator();
+ return overshadowableManager.getVisibleChunks().spliterator();
}
public Stream<PartitionChunk<T>> stream()
@@ -129,7 +129,7 @@ public class PartitionHolder<T extends Overshadowable<T>> implements Iterable<Pa
public List<PartitionChunk<T>> getOvershadowed()
{
- return overshadowableManager.getOvershadowed();
+ return overshadowableManager.getOvershadowedChunks();
}
public Iterable<T> payloads()
diff --git a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
index 6971833..28b2dcca 100644
--- a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.timeline.partition.ImmutablePartitionHolder;
import org.apache.druid.timeline.partition.IntegerPartitionChunk;
import org.apache.druid.timeline.partition.NumberedOverwritingPartitionChunk;
import org.apache.druid.timeline.partition.NumberedPartitionChunk;
+import org.apache.druid.timeline.partition.OvershadowableInteger;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.timeline.partition.PartitionIds;
@@ -48,7 +49,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Objects;
import java.util.Set;
/**
@@ -2145,116 +2145,4 @@ public class VersionedIntervalTimelineTest
{
return new VersionedIntervalTimeline<>(Ordering.natural());
}
-
- private static class OvershadowableInteger implements Overshadowable<OvershadowableInteger>
- {
- private final String majorVersion;
- private final int partitionNum;
- private final int val;
- private final int startRootPartitionId;
- private final int endRootPartitionId;
- private final short minorVersion;
- private final short atomicUpdateGroupSize;
-
- private OvershadowableInteger(String majorVersion, int partitionNum, int val)
- {
- this(majorVersion, partitionNum, val, partitionNum, partitionNum + 1, 0, 1);
- }
-
- private OvershadowableInteger(
- String majorVersion,
- int partitionNum,
- int val,
- int startRootPartitionId,
- int endRootPartitionId,
- int minorVersion,
- int atomicUpdateGroupSize
- )
- {
- this.majorVersion = majorVersion;
- this.partitionNum = partitionNum;
- this.val = val;
- this.startRootPartitionId = startRootPartitionId;
- this.endRootPartitionId = endRootPartitionId;
- this.minorVersion = (short) minorVersion;
- this.atomicUpdateGroupSize = (short) atomicUpdateGroupSize;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- OvershadowableInteger that = (OvershadowableInteger) o;
- return partitionNum == that.partitionNum &&
- val == that.val &&
- startRootPartitionId == that.startRootPartitionId &&
- endRootPartitionId == that.endRootPartitionId &&
- minorVersion == that.minorVersion &&
- atomicUpdateGroupSize == that.atomicUpdateGroupSize &&
- Objects.equals(majorVersion, that.majorVersion);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(
- majorVersion,
- partitionNum,
- val,
- startRootPartitionId,
- endRootPartitionId,
- minorVersion,
- atomicUpdateGroupSize
- );
- }
-
- @Override
- public String toString()
- {
- return "OvershadowableInteger{" +
- "majorVersion='" + majorVersion + '\'' +
- ", partitionNum=" + partitionNum +
- ", val=" + val +
- ", startRootPartitionId=" + startRootPartitionId +
- ", endRootPartitionId=" + endRootPartitionId +
- ", minorVersion=" + minorVersion +
- ", atomicUpdateGroupSize=" + atomicUpdateGroupSize +
- '}';
- }
-
- @Override
- public int getStartRootPartitionId()
- {
- return startRootPartitionId;
- }
-
- @Override
- public int getEndRootPartitionId()
- {
- return endRootPartitionId;
- }
-
- @Override
- public String getVersion()
- {
- return majorVersion;
- }
-
- @Override
- public short getMinorVersion()
- {
- return minorVersion;
- }
-
- @Override
- public short getAtomicUpdateGroupSize()
- {
- return atomicUpdateGroupSize;
- }
- }
}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/IntegerPartitionChunkTest.java b/core/src/test/java/org/apache/druid/timeline/partition/IntegerPartitionChunkTest.java
index 04e4ccf..b783439 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/IntegerPartitionChunkTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/IntegerPartitionChunkTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.timeline.partition;
-import org.apache.druid.timeline.Overshadowable;
import org.junit.Assert;
import org.junit.Test;
@@ -110,44 +109,4 @@ public class IntegerPartitionChunkTest
Assert.assertEquals(make(10, null, 0, 1), make(10, null, 0, 1));
Assert.assertEquals(make(10, 11, 0, 1), make(10, 11, 0, 1));
}
-
- private static class OvershadowableInteger implements Overshadowable<OvershadowableInteger>
- {
- private final int val;
-
- OvershadowableInteger(int val)
- {
- this.val = val;
- }
-
- @Override
- public int getStartRootPartitionId()
- {
- return 0;
- }
-
- @Override
- public int getEndRootPartitionId()
- {
- return 1;
- }
-
- @Override
- public String getVersion()
- {
- return "";
- }
-
- @Override
- public short getMinorVersion()
- {
- return 0;
- }
-
- @Override
- public short getAtomicUpdateGroupSize()
- {
- return 1;
- }
- }
}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableInteger.java b/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableInteger.java
new file mode 100644
index 0000000..1eca972
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableInteger.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import org.apache.druid.timeline.Overshadowable;
+
+import java.util.Objects;
+
+public class OvershadowableInteger implements Overshadowable<OvershadowableInteger>
+{
+ private final String majorVersion;
+ private final int partitionNum;
+ private final int val;
+ private final int startRootPartitionId;
+ private final int endRootPartitionId;
+ private final short minorVersion;
+ private final short atomicUpdateGroupSize;
+
+ public OvershadowableInteger(int val)
+ {
+ this("", 0, val);
+ }
+
+ public OvershadowableInteger(String majorVersion, int partitionNum, int val)
+ {
+ this(majorVersion, partitionNum, val, partitionNum, partitionNum + 1, 0, 1);
+ }
+
+ public OvershadowableInteger(
+ String majorVersion,
+ int partitionNum,
+ int val,
+ int startRootPartitionId,
+ int endRootPartitionId,
+ int minorVersion,
+ int atomicUpdateGroupSize
+ )
+ {
+ this.majorVersion = majorVersion;
+ this.partitionNum = partitionNum;
+ this.val = val;
+ this.startRootPartitionId = startRootPartitionId;
+ this.endRootPartitionId = endRootPartitionId;
+ this.minorVersion = (short) minorVersion;
+ this.atomicUpdateGroupSize = (short) atomicUpdateGroupSize;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OvershadowableInteger that = (OvershadowableInteger) o;
+ return partitionNum == that.partitionNum &&
+ val == that.val &&
+ startRootPartitionId == that.startRootPartitionId &&
+ endRootPartitionId == that.endRootPartitionId &&
+ minorVersion == that.minorVersion &&
+ atomicUpdateGroupSize == that.atomicUpdateGroupSize &&
+ Objects.equals(majorVersion, that.majorVersion);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ majorVersion,
+ partitionNum,
+ val,
+ startRootPartitionId,
+ endRootPartitionId,
+ minorVersion,
+ atomicUpdateGroupSize
+ );
+ }
+
+ @Override
+ public String toString()
+ {
+ return "OvershadowableInteger{" +
+ "majorVersion='" + majorVersion + '\'' +
+ ", partitionNum=" + partitionNum +
+ ", val=" + val +
+ ", startRootPartitionId=" + startRootPartitionId +
+ ", endRootPartitionId=" + endRootPartitionId +
+ ", minorVersion=" + minorVersion +
+ ", atomicUpdateGroupSize=" + atomicUpdateGroupSize +
+ '}';
+ }
+
+ @Override
+ public int getStartRootPartitionId()
+ {
+ return startRootPartitionId;
+ }
+
+ @Override
+ public int getEndRootPartitionId()
+ {
+ return endRootPartitionId;
+ }
+
+ @Override
+ public String getVersion()
+ {
+ return majorVersion;
+ }
+
+ @Override
+ public short getMinorVersion()
+ {
+ return minorVersion;
+ }
+
+ @Override
+ public short getAtomicUpdateGroupSize()
+ {
+ return atomicUpdateGroupSize;
+ }
+}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java b/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java
new file mode 100644
index 0000000..71566d0
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.timeline.partition.OvershadowableManager.RootPartitionRange;
+import org.apache.druid.timeline.partition.OvershadowableManager.State;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class OvershadowableManagerTest
+{
+ private static final String MAJOR_VERSION = "version";
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private OvershadowableManager<OvershadowableInteger> manager;
+ private int nextRootPartitionId;
+ private int nextNonRootPartitionId;
+ private List<PartitionChunk<OvershadowableInteger>> expectedVisibleChunks;
+ private List<PartitionChunk<OvershadowableInteger>> expectedOvershadowedChunks;
+ private List<PartitionChunk<OvershadowableInteger>> expectedStandbyChunks;
+
+ @Before
+ public void setup()
+ {
+ manager = new OvershadowableManager<>();
+ nextRootPartitionId = PartitionIds.ROOT_GEN_START_PARTITION_ID;
+ nextNonRootPartitionId = PartitionIds.NON_ROOT_GEN_START_PARTITION_ID;
+ expectedVisibleChunks = new ArrayList<>();
+ expectedOvershadowedChunks = new ArrayList<>();
+ expectedStandbyChunks = new ArrayList<>();
+ }
+
+ @Test
+ public void testFindOvershadowedBy()
+ {
+ final List<PartitionChunk<OvershadowableInteger>> expectedOvershadowedChunks = new ArrayList<>();
+
+ // All chunks except the last one are in the overshadowed state
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 2, 1, 1);
+ manager.addChunk(chunk);
+ chunk = newNonRootChunk(0, 3, 2, 1);
+ manager.addChunk(chunk);
+ chunk = newNonRootChunk(0, 5, 3, 1);
+ manager.addChunk(chunk);
+ chunk = newNonRootChunk(5, 8, 1, 1);
+ expectedOvershadowedChunks.add(chunk);
+ manager.addChunk(chunk);
+ chunk = newNonRootChunk(8, 11, 2, 1);
+ manager.addChunk(chunk);
+ chunk = newNonRootChunk(5, 11, 3, 1);
+ manager.addChunk(chunk);
+ chunk = newNonRootChunk(0, 12, 5, 1);
+ manager.addChunk(chunk);
+
+ List<AtomicUpdateGroup<OvershadowableInteger>> overshadowedGroups = manager.findOvershadowedBy(
+ RootPartitionRange.of(2, 10),
+ (short) 10,
+ State.OVERSHADOWED
+ );
+ Assert.assertEquals(
+ expectedOvershadowedChunks.stream().map(AtomicUpdateGroup::new).collect(Collectors.toList()),
+ overshadowedGroups
+ );
+
+ overshadowedGroups = manager.findOvershadowedBy(
+ RootPartitionRange.of(2, 10),
+ (short) 10,
+ State.VISIBLE
+ );
+ Assert.assertEquals(
+ Collections.emptyList(),
+ overshadowedGroups
+ );
+ }
+
+ @Test
+ public void testFindOvershadows()
+ {
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(2, 6, 3, 1);
+ manager.addChunk(chunk);
+ chunk = newNonRootChunk(6, 8, 3, 1);
+ manager.addChunk(chunk);
+ chunk = newNonRootChunk(1, 8, 4, 1);
+ final PartitionChunk<OvershadowableInteger> visibleChunk = chunk;
+ manager.addChunk(chunk);
+
+ List<AtomicUpdateGroup<OvershadowableInteger>> overshadowingGroups = manager.findOvershadows(
+ RootPartitionRange.of(1, 3),
+ (short) 1,
+ State.OVERSHADOWED
+ );
+ Assert.assertEquals(
+ Collections.emptyList(),
+ overshadowingGroups
+ );
+ overshadowingGroups = manager.findOvershadows(
+ RootPartitionRange.of(1, 3),
+ (short) 1,
+ State.VISIBLE
+ );
+ Assert.assertEquals(
+ ImmutableList.of(new AtomicUpdateGroup<>(visibleChunk)),
+ overshadowingGroups
+ );
+
+ overshadowingGroups = manager.findOvershadows(
+ RootPartitionRange.of(4, 7),
+ (short) 1,
+ State.OVERSHADOWED
+ );
+ Assert.assertEquals(
+ Collections.emptyList(),
+ overshadowingGroups
+ );
+ overshadowingGroups = manager.findOvershadows(
+ RootPartitionRange.of(4, 7),
+ (short) 1,
+ State.VISIBLE
+ );
+ Assert.assertEquals(
+ ImmutableList.of(new AtomicUpdateGroup<>(visibleChunk)),
+ overshadowingGroups
+ );
+ }
+
+ @Test
+ public void testAddRootChunkToEmptyManager()
+ {
+ Assert.assertTrue(manager.isEmpty());
+ // Add a new one
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ Assert.assertTrue(manager.isComplete());
+ // Add a duplicate
+ Assert.assertFalse(manager.addChunk(chunk));
+ // Add a new one
+ chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ Assert.assertTrue(manager.isComplete());
+ }
+
+ @Test
+ public void testAddNonRootChunkToEmptyManager()
+ {
+ Assert.assertTrue(manager.isEmpty());
+ // Add a new one, atomicUpdateGroup is not full
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(10, 12, 1, 3);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ Assert.assertFalse(manager.isComplete());
+ // Add a new one, atomicUpdateGroup is still not full
+ chunk = newNonRootChunk(10, 12, 1, 3);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ Assert.assertFalse(manager.isComplete());
+ // Add a new one, now atomicUpdateGroup is full
+ chunk = newNonRootChunk(10, 12, 1, 3);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ Assert.assertTrue(manager.isComplete());
+
+ // Add a new one to the full group
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Can't add chunk");
+ chunk = newNonRootChunk(10, 12, 1, 3);
+ addVisibleToManager(chunk);
+ }
+
+ @Test
+ public void testRemoveFromEmptyManager()
+ {
+ Assert.assertTrue(manager.isEmpty());
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertNull(manager.removeChunk(chunk));
+ }
+
+ @Test
+ public void testAddOvershadowedChunkToCompletePartition()
+ {
+ // Start with a non-root incomplete partitionChunk
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 3, 1, 2);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add a visible root chunk, now this group is complete
+ chunk = newNonRootChunk(0, 3, 1, 2);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add an overshadowed chunk
+ nextRootPartitionId = 1;
+ chunk = newRootChunk();
+ Assert.assertTrue(manager.addChunk(chunk));
+ expectedOvershadowedChunks.add(chunk);
+ assertManagerState();
+ }
+
+ @Test
+ public void testAddOvershadowedChunkToIncompletePartition()
+ {
+ // Start with a non-root partitionChunk. This group is incomplete.
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 3, 1, 2);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add an overshadowed chunk
+ nextRootPartitionId = 1;
+ chunk = newRootChunk();
+ expectedOvershadowedChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testAddStandbyChunksToCompletePartition()
+ {
+ // Add complete chunks
+ PartitionChunk<OvershadowableInteger> chunk;
+ for (int i = 0; i < 3; i++) {
+ chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ }
+
+ // Add a chunk of an incomplete group
+ chunk = newNonRootChunk(0, 3, 1, 2);
+ expectedStandbyChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+
+ // This group is now full
+ chunk = newNonRootChunk(0, 3, 1, 2);
+ expectedOvershadowedChunks.addAll(expectedVisibleChunks);
+ expectedVisibleChunks.clear();
+ expectedVisibleChunks.addAll(expectedStandbyChunks);
+ expectedStandbyChunks.clear();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testAddStandbyChunksToIncompletePartition()
+ {
+ // Add a chunk of an incomplete group
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 3, 1, 2);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add chunks of an incomplete group overshadowing the previous one
+ chunk = newNonRootChunk(0, 3, 2, 3);
+ expectedOvershadowedChunks.add(expectedVisibleChunks.remove(0));
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ chunk = newNonRootChunk(0, 3, 2, 3);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testRemoveUnknownChunk()
+ {
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ chunk = newRootChunk();
+ Assert.assertNull(manager.removeChunk(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testRemoveChunksUntilEmpty()
+ {
+ PartitionChunk<OvershadowableInteger> chunk;
+ for (int i = 0; i < 10; i++) {
+ chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ }
+
+ while (expectedVisibleChunks.size() > 0) {
+ chunk = expectedVisibleChunks.remove(ThreadLocalRandom.current().nextInt(expectedVisibleChunks.size()));
+ Assert.assertEquals(chunk, manager.removeChunk(chunk));
+ assertManagerState();
+ }
+
+ Assert.assertTrue(manager.isEmpty());
+ }
+
+ @Test
+ public void testRemoveStandbyChunk()
+ {
+ // Add complete groups
+ PartitionChunk<OvershadowableInteger> chunk;
+ for (int i = 0; i < 3; i++) {
+ chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ }
+
+ // Add two chunks of an incomplete group overshadowing the previous one
+ chunk = newNonRootChunk(0, 3, 1, 3);
+ expectedStandbyChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+ chunk = newNonRootChunk(0, 3, 1, 3);
+ expectedStandbyChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+
+ // Remove one standby chunk
+ chunk = expectedStandbyChunks.remove(0);
+ Assert.assertEquals(chunk, manager.removeChunk(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testRemoveVisibleChunkAndFallBackToStandby()
+ {
+ // Add two complete groups
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add two chunks of an incomplete group
+ chunk = newNonRootChunk(0, 2, 1, 3);
+ expectedStandbyChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+ chunk = newNonRootChunk(0, 2, 1, 3);
+ expectedStandbyChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+
+ // Remove a chunk of the incomplete group
+ chunk = expectedVisibleChunks.remove(0);
+ Assert.assertEquals(chunk, manager.removeChunk(chunk));
+ expectedOvershadowedChunks.addAll(expectedVisibleChunks);
+ expectedVisibleChunks.clear();
+ expectedVisibleChunks.addAll(expectedStandbyChunks);
+ expectedStandbyChunks.clear();
+ assertManagerState();
+ }
+
+ @Test
+ public void testAddCompleteOvershadowedToInCompletePartition()
+ {
+ // Add an incomplete group
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 2, 1, 3);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ chunk = newNonRootChunk(0, 2, 1, 3);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add a complete overshadowed group
+ chunk = newRootChunk();
+ expectedOvershadowedChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+ chunk = newRootChunk();
+ expectedStandbyChunks.addAll(expectedVisibleChunks);
+ expectedVisibleChunks.clear();
+ expectedVisibleChunks.add(expectedOvershadowedChunks.remove(0));
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testAddCompleteOvershadowedToCompletePartition()
+ {
+ // Add a complete group
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 2, 1, 2);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ chunk = newNonRootChunk(0, 2, 1, 2);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add a complete overshadowed group
+ chunk = newRootChunk();
+ expectedOvershadowedChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+ chunk = newRootChunk();
+ expectedOvershadowedChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testRemoveChunkFromOvershadowd()
+ {
+ // Add a complete group
+ nextRootPartitionId = 1;
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add an incomplete group of a larger partition range
+ chunk = newNonRootChunk(0, 2, 1, 2);
+ expectedOvershadowedChunks.add(expectedVisibleChunks.remove(0));
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Remove an overshadowed chunk
+ chunk = expectedOvershadowedChunks.remove(0);
+ Assert.assertEquals(chunk, manager.removeChunk(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testRemoveChunkFromCompleteParition()
+ {
+ // Add a complete group
+ nextRootPartitionId = 1;
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add a complete group overshadowing the previous
+ chunk = newNonRootChunk(0, 2, 1, 2);
+ expectedOvershadowedChunks.add(expectedVisibleChunks.remove(0));
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ chunk = newNonRootChunk(0, 2, 1, 2);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Remove a chunk from the visible group
+ chunk = expectedVisibleChunks.remove(0);
+ Assert.assertEquals(chunk, manager.removeChunk(chunk));
+ assertManagerState();
+
+ // Remove another chunk from the visible group. Now the overshadowed group should be visible.
+ chunk = expectedVisibleChunks.remove(0);
+ expectedVisibleChunks.addAll(expectedOvershadowedChunks);
+ expectedOvershadowedChunks.clear();
+ Assert.assertEquals(chunk, manager.removeChunk(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testRemoveChunkFromCompletePartitionFallBackToOvershadowed()
+ {
+ // Add complete groups
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add a complete group overshadowing the previous
+ chunk = newNonRootChunk(0, 2, 1, 2);
+ expectedStandbyChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+
+ chunk = newNonRootChunk(0, 2, 1, 2);
+ expectedOvershadowedChunks.addAll(expectedVisibleChunks);
+ expectedVisibleChunks.clear();
+ expectedVisibleChunks.add(expectedStandbyChunks.remove(0));
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Remove a visible chunk. Should fall back to the complete overshadowed group.
+ chunk = expectedVisibleChunks.remove(0);
+ expectedStandbyChunks.addAll(expectedVisibleChunks);
+ expectedVisibleChunks.clear();
+ expectedVisibleChunks.addAll(expectedOvershadowedChunks);
+ expectedOvershadowedChunks.clear();
+ Assert.assertEquals(chunk, manager.removeChunk(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testAddCompleteOvershadowedToCompletePartition2()
+ {
+ // Add overshadowed incomplete groups
+ List<PartitionChunk<OvershadowableInteger>> chunks = newNonRootChunks(2, 0, 2, 1, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+ chunks = newNonRootChunks(2, 2, 5, 1, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+ chunks = newNonRootChunks(2, 5, 8, 1, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+ chunks = newNonRootChunks(2, 8, 10, 1, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ chunks = newNonRootChunks(2, 0, 5, 2, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ chunks = newNonRootChunks(2, 0, 8, 3, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ // Add a visible complete group
+ chunks = newNonRootChunks(2, 0, 10, 4, 2);
+ expectedVisibleChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ // Add a standby incomplete group
+ chunks = newNonRootChunks(1, 0, 10, 5, 2);
+ expectedStandbyChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ assertManagerState();
+
+ // Add a chunk to complete the second overshadowed group
+ chunks = newNonRootChunks(1, 0, 5, 2, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+ chunks = newNonRootChunks(1, 5, 8, 1, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+ chunks = newNonRootChunks(1, 8, 10, 1, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ assertManagerState();
+
+ // Remove a chunk from the visible group
+ PartitionChunk<OvershadowableInteger> chunkToRemove = expectedVisibleChunks.remove(0);
+ expectedStandbyChunks.addAll(expectedVisibleChunks);
+ expectedVisibleChunks.clear();
+ Iterator<PartitionChunk<OvershadowableInteger>> iterator = expectedOvershadowedChunks.iterator();
+ while (iterator.hasNext()) {
+ final PartitionChunk<OvershadowableInteger> chunk = iterator.next();
+ if (chunk.getObject().getStartRootPartitionId() == 0 && chunk.getObject().getMinorVersion() == 2
+ || chunk.getObject().getStartRootPartitionId() == 5 && chunk.getObject().getMinorVersion() == 1
+ || chunk.getObject().getStartRootPartitionId() == 8 && chunk.getObject().getMinorVersion() == 1) {
+ expectedVisibleChunks.add(chunk);
+ iterator.remove();
+ } else if (chunk.getObject().getStartRootPartitionId() == 0 && chunk.getObject().getMinorVersion() > 2
+ || chunk.getObject().getStartRootPartitionId() == 5 && chunk.getObject().getMinorVersion() > 1
+ || chunk.getObject().getStartRootPartitionId() == 8 && chunk.getObject().getMinorVersion() > 1) {
+ expectedStandbyChunks.add(chunk);
+ iterator.remove();
+ }
+ }
+ Assert.assertEquals(chunkToRemove, manager.removeChunk(chunkToRemove));
+ assertManagerState();
+ }
+
+ @Test
+ public void testAddCompleteStandbyToCompletePartition()
+ {
+ // Add overshadowed groups
+ List<PartitionChunk<OvershadowableInteger>> chunks = newNonRootChunks(2, 0, 2, 1, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+ chunks = newNonRootChunks(2, 2, 5, 1, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+ chunks = newNonRootChunks(2, 5, 8, 1, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+ chunks = newNonRootChunks(2, 8, 10, 1, 2);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ // Visible group for [0, 5)
+ chunks = newNonRootChunks(2, 0, 5, 2, 2);
+ expectedVisibleChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+ // Visible group for [5, 10)
+ chunks = newNonRootChunks(2, 5, 10, 2, 2);
+ expectedVisibleChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ // Standby groups
+ chunks = newNonRootChunks(2, 0, 5, 3, 3);
+ expectedStandbyChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+ chunks = newNonRootChunks(2, 5, 10, 3, 3);
+ expectedStandbyChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ chunks = newNonRootChunks(2, 0, 5, 4, 3);
+ expectedStandbyChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ chunks = newNonRootChunks(2, 0, 10, 5, 3);
+ expectedStandbyChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ assertManagerState();
+
+ // Add a chunk to complete the second standby group
+ expectedOvershadowedChunks.addAll(expectedVisibleChunks);
+ expectedVisibleChunks.clear();
+ chunks = newNonRootChunks(1, 0, 5, 4, 3);
+ chunks.forEach(this::addVisibleToManager);
+ chunks = newNonRootChunks(1, 5, 10, 3, 3);
+ chunks.forEach(this::addVisibleToManager);
+
+ Iterator<PartitionChunk<OvershadowableInteger>> iterator = expectedStandbyChunks.iterator();
+ while (iterator.hasNext()) {
+ final PartitionChunk<OvershadowableInteger> chunk = iterator.next();
+ if (chunk.getObject().getStartRootPartitionId() == 0 && chunk.getObject().getMinorVersion() == 4
+ || chunk.getObject().getStartRootPartitionId() == 5 && chunk.getObject().getMinorVersion() == 3) {
+ expectedVisibleChunks.add(chunk);
+ iterator.remove();
+ } else if (chunk.getObject().getStartRootPartitionId() == 0 && chunk.getObject().getMinorVersion() < 4
+ || chunk.getObject().getStartRootPartitionId() == 5 && chunk.getObject().getMinorVersion() < 3) {
+ expectedOvershadowedChunks.add(chunk);
+ iterator.remove();
+ }
+ }
+
+ assertManagerState();
+ }
+
+ @Test
+ public void testFallBackToStandby2()
+ {
+ // Add an overshadowed incomplete group
+ List<PartitionChunk<OvershadowableInteger>> chunks = newNonRootChunks(2, 0, 2, 1, 3);
+ expectedOvershadowedChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ // Add a visible complete group
+ chunks = newNonRootChunks(2, 0, 2, 2, 2);
+ expectedVisibleChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ // Add three standby incomplete groups
+ chunks = newNonRootChunks(2, 0, 2, 3, 3);
+ expectedStandbyChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ chunks = newNonRootChunks(2, 0, 2, 4, 3);
+ expectedStandbyChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ chunks = newNonRootChunks(2, 0, 2, 5, 3);
+ expectedStandbyChunks.addAll(chunks);
+ chunks.forEach(manager::addChunk);
+
+ assertManagerState();
+
+ // Remove a visible chunk. The latest standby group should be visible.
+ PartitionChunk<OvershadowableInteger> chunkToRemove = expectedVisibleChunks.remove(0);
+ expectedOvershadowedChunks.addAll(expectedVisibleChunks);
+ expectedVisibleChunks.clear();
+ Iterator<PartitionChunk<OvershadowableInteger>> iterator = expectedStandbyChunks.iterator();
+ while (iterator.hasNext()) {
+ final PartitionChunk<OvershadowableInteger> chunk = iterator.next();
+ if (chunk.getObject().getMinorVersion() == 5) {
+ expectedVisibleChunks.add(chunk);
+ iterator.remove();
+ } else {
+ expectedOvershadowedChunks.add(chunk);
+ iterator.remove();
+ }
+ }
+
+ Assert.assertEquals(chunkToRemove, manager.removeChunk(chunkToRemove));
+ assertManagerState();
+ }
+
+ @Test
+ public void testAddAndOverwriteAndAdd()
+ {
+ // Start with root partitionChunks
+ for (int i = 0; i < 5; i++) {
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ }
+ assertManagerState();
+
+ // Overwrite some partitionChunks with a higher minor version
+ final int rootStartPartitionIdToOverwrite = expectedVisibleChunks.get(1).getChunkNumber();
+ final int rootEndPartitionIdToOverwrite = expectedVisibleChunks.get(3).getChunkNumber();
+ for (int i = 0; i < 2; i++) {
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(
+ rootStartPartitionIdToOverwrite,
+ rootEndPartitionIdToOverwrite,
+ 3,
+ 2
+ );
+ Assert.assertTrue(manager.addChunk(chunk));
+ if (i == 0) {
+ expectedStandbyChunks.add(chunk);
+ }
+ if (i == 1) {
+ expectedOvershadowedChunks.addAll(expectedVisibleChunks.subList(1, 3));
+ expectedVisibleChunks.subList(1, 3).clear();
+ expectedVisibleChunks.addAll(expectedStandbyChunks);
+ expectedVisibleChunks.add(chunk);
+ expectedStandbyChunks.clear();
+ }
+ assertManagerState();
+ }
+
+ // Append new visible chunks
+ for (int i = 0; i < 3; i++) {
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ }
+ assertManagerState();
+
+ // Append complete overshadowed chunks
+ for (int i = 0; i < 2; i++) {
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(
+ rootStartPartitionIdToOverwrite,
+ rootEndPartitionIdToOverwrite,
+ 2,
+ 2
+ );
+ expectedOvershadowedChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+ }
+ }
+
+ @Test
+ public void testRemoveOvershadowed()
+ {
+ // Start with root partitionChunks
+ for (int i = 0; i < 5; i++) {
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ }
+
+ // Overwrite some partitionChunks with a higher minor version
+ final int rootStartPartitionIdToOverwrite = expectedVisibleChunks.get(1).getChunkNumber();
+ final int rootEndPartitionIdToOverwrite = expectedVisibleChunks.get(3).getChunkNumber();
+ for (int i = 0; i < 2; i++) {
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(
+ rootStartPartitionIdToOverwrite,
+ rootEndPartitionIdToOverwrite,
+ 1,
+ 2
+ );
+ Assert.assertTrue(addVisibleToManager(chunk));
+ }
+ expectedOvershadowedChunks.addAll(expectedVisibleChunks.subList(1, 3));
+ IntStream.range(0, 2).forEach(i -> expectedVisibleChunks.remove(1));
+ assertManagerState();
+
+ // Remove an overshadowed chunk
+ PartitionChunk<OvershadowableInteger> chunk = expectedOvershadowedChunks.remove(0);
+ Assert.assertEquals(chunk, manager.removeChunk(chunk));
+ assertManagerState();
+
+ // Remove a chunk overshadows others
+ for (PartitionChunk<OvershadowableInteger> visibleChunk : expectedVisibleChunks) {
+ if (visibleChunk.getChunkNumber() >= PartitionIds.NON_ROOT_GEN_START_PARTITION_ID) {
+ Assert.assertEquals(visibleChunk, removeVisibleFromManager(visibleChunk));
+ break;
+ }
+ }
+ assertManagerState();
+ }
+
+ @Test
+ public void testRemoveOvershadowingVisible()
+ {
+ // Start with root partitionChunks
+ for (int i = 0; i < 5; i++) {
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ }
+
+ // Overwrite some partitionChunks with a higher minor version
+ final int rootStartPartitionIdToOverwrite = expectedVisibleChunks.get(1).getChunkNumber();
+ final int rootEndPartitionIdToOverwrite = expectedVisibleChunks.get(3).getChunkNumber();
+ for (int i = 0; i < 2; i++) {
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(
+ rootStartPartitionIdToOverwrite,
+ rootEndPartitionIdToOverwrite,
+ 1,
+ 2
+ );
+ Assert.assertTrue(addVisibleToManager(chunk));
+ }
+ expectedOvershadowedChunks.addAll(expectedVisibleChunks.subList(1, 3));
+ IntStream.range(0, 2).forEach(i -> expectedVisibleChunks.remove(1));
+ assertManagerState();
+
+ // Remove a chunk overshadows others
+ boolean removed = false;
+ final Iterator<PartitionChunk<OvershadowableInteger>> iterator = expectedVisibleChunks.iterator();
+ while (iterator.hasNext()) {
+ final PartitionChunk<OvershadowableInteger> visibleChunk = iterator.next();
+ if (visibleChunk.getChunkNumber() >= PartitionIds.NON_ROOT_GEN_START_PARTITION_ID) {
+ iterator.remove();
+ if (!removed) {
+ manager.removeChunk(visibleChunk);
+ removed = true;
+ } else {
+ expectedStandbyChunks.add(visibleChunk);
+ }
+ }
+ }
+ expectedVisibleChunks.addAll(expectedOvershadowedChunks);
+ expectedOvershadowedChunks.clear();
+ assertManagerState();
+ }
+
+ @Test
+ public void testFallBackToStandbyOnRemove()
+ {
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add a chunk of an incomplete atomicUpdateGroup
+ chunk = newNonRootChunk(0, 1, 1, 3);
+ expectedStandbyChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+
+ // Add a chunk of an incomplete atomicUpdateGroup which overshadows the previous one
+ chunk = newNonRootChunk(0, 1, 2, 2);
+ expectedStandbyChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ assertManagerState();
+
+ // Remove the visible chunk
+ chunk = expectedVisibleChunks.remove(0);
+ expectedVisibleChunks.add(expectedStandbyChunks.remove(1));
+ expectedOvershadowedChunks.add(expectedStandbyChunks.remove(0));
+ Assert.assertEquals(chunk, manager.removeChunk(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testFallBackToOvershadowedOnRemove()
+ {
+ PartitionChunk<OvershadowableInteger> chunk;
+ // Add incomplete non-root group
+ for (int i = 0; i < 2; i++) {
+ chunk = newNonRootChunk(10, 20, 5, 3);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ }
+ assertManagerState();
+
+ // Add incomplete non-root group overshadowed by the previous one
+ for (int i = 0; i < 2; i++) {
+ chunk = newNonRootChunk(10, 20, 4, 3);
+ expectedOvershadowedChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ chunk = newNonRootChunk(10, 20, 3, 3);
+ expectedOvershadowedChunks.add(chunk);
+ Assert.assertTrue(manager.addChunk(chunk));
+ }
+ assertManagerState();
+
+ // Remove the visible group one by one
+ chunk = expectedVisibleChunks.remove(0);
+ Assert.assertEquals(chunk, manager.removeChunk(chunk));
+ assertManagerState();
+
+ chunk = expectedVisibleChunks.remove(0);
+ expectedOvershadowedChunks
+ .stream()
+ .filter(c -> c.getObject().getMinorVersion() == 4)
+ .forEach(c -> expectedVisibleChunks.add(c));
+ expectedOvershadowedChunks.removeAll(expectedVisibleChunks);
+ Assert.assertEquals(chunk, manager.removeChunk(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testAddIncompleteAtomicUpdateGroups()
+ {
+ // Add an incomplete chunk
+ PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(0, 1, 1, 3);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add an incomplete chunk overshadowing the previous one. The atomicUpdateGroup of this chunk
+ // will be complete later in this test.
+ chunk = newNonRootChunk(0, 1, 2, 2);
+ expectedOvershadowedChunks.add(expectedVisibleChunks.remove(0));
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add an incomplete chunk overshadowing the previous one
+ chunk = newNonRootChunk(0, 1, 3, 5);
+ expectedOvershadowedChunks.add(expectedVisibleChunks.remove(0));
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add a chunk to complete the second atomicUpdateGroup overshadowed by the previous one
+ chunk = newNonRootChunk(0, 1, 2, 2);
+ expectedStandbyChunks.add(expectedVisibleChunks.remove(0));
+ expectedVisibleChunks.add(expectedOvershadowedChunks.remove(expectedOvershadowedChunks.size() - 1));
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+ }
+
+ @Test
+ public void testMissingStartRootPartitionId()
+ {
+ // Simulate the first two chunks are missing at the root level
+ nextRootPartitionId = 2;
+ PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
+ Assert.assertTrue(addVisibleToManager(chunk));
+ assertManagerState();
+
+ // Add a new group overshadows the previous one
+ expectedOvershadowedChunks.addAll(expectedVisibleChunks);
+ expectedVisibleChunks.clear();
+ for (int i = 0; i < 2; i++) {
+ chunk = newNonRootChunk(0, 3, 1, 2);
+ Assert.assertTrue(addVisibleToManager(chunk));
+ }
+ assertManagerState();
+
+ // Remove the visible group
+ for (int i = 0; i < 2; i++) {
+ chunk = expectedVisibleChunks.remove(0);
+ Assert.assertEquals(chunk, manager.removeChunk(chunk));
+ }
+ expectedVisibleChunks.addAll(expectedOvershadowedChunks);
+ expectedOvershadowedChunks.clear();
+ assertManagerState();
+ }
+
+ private boolean addVisibleToManager(PartitionChunk<OvershadowableInteger> chunk)
+ {
+ expectedVisibleChunks.add(chunk);
+ return manager.addChunk(chunk);
+ }
+
+ private PartitionChunk<OvershadowableInteger> removeVisibleFromManager(PartitionChunk<OvershadowableInteger> chunk)
+ {
+ expectedVisibleChunks.remove(chunk);
+ return manager.removeChunk(chunk);
+ }
+
+ private void assertManagerState()
+ {
+ Assert.assertEquals(
+ "Mismatched visible chunks",
+ new HashSet<>(expectedVisibleChunks),
+ new HashSet<>(manager.getVisibleChunks())
+ );
+ Assert.assertEquals(
+ "Mismatched overshadowed chunks",
+ new HashSet<>(expectedOvershadowedChunks),
+ new HashSet<>(manager.getOvershadowedChunks())
+ );
+ Assert.assertEquals(
+ "Mismatched standby chunks",
+ new HashSet<>(expectedStandbyChunks),
+ new HashSet<>(manager.getStandbyChunks())
+ );
+ }
+
+ private List<PartitionChunk<OvershadowableInteger>> newNonRootChunks(
+ int n,
+ int startPartitionId,
+ int endPartitionId,
+ int minorVersion,
+ int atomicUpdateGroupSize
+ )
+ {
+ return IntStream
+ .range(0, n)
+ .mapToObj(i -> newNonRootChunk(startPartitionId, endPartitionId, minorVersion, atomicUpdateGroupSize))
+ .collect(Collectors.toList());
+ }
+
+ private NumberedPartitionChunk<OvershadowableInteger> newRootChunk()
+ {
+ final int partitionId = nextRootPartitionId();
+ return new NumberedPartitionChunk<>(partitionId, 0, new OvershadowableInteger(MAJOR_VERSION, partitionId, 0));
+ }
+
+ private NumberedOverwritingPartitionChunk<OvershadowableInteger> newNonRootChunk(
+ int startRootPartitionId,
+ int endRootPartitionId,
+ int minorVersion,
+ int atomicUpdateGroupSize
+ )
+ {
+ final int partitionId = nextNonRootPartitionId();
+ return new NumberedOverwritingPartitionChunk<>(
+ partitionId,
+ new OvershadowableInteger(
+ MAJOR_VERSION,
+ partitionId,
+ 0,
+ startRootPartitionId,
+ endRootPartitionId,
+ minorVersion,
+ atomicUpdateGroupSize
+ )
+ );
+ }
+
+ private int nextRootPartitionId()
+ {
+ return nextRootPartitionId++;
+ }
+
+ private int nextNonRootPartitionId()
+ {
+ return nextNonRootPartitionId++;
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
index 335cd1e..e32a53c 100644
--- a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
+++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.Overshadowable;
import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import java.io.Closeable;
@@ -70,9 +71,9 @@ public class ReferenceCountingSegment extends AbstractSegment implements Oversha
}
};
- public ReferenceCountingSegment(Segment baseSegment)
+ public static ReferenceCountingSegment wrapRootGenerationSegment(Segment baseSegment)
{
- this(
+ return new ReferenceCountingSegment(
Preconditions.checkNotNull(baseSegment, "baseSegment"),
baseSegment.getId().getPartitionNum(),
(baseSegment.getId().getPartitionNum() + 1),
@@ -81,7 +82,21 @@ public class ReferenceCountingSegment extends AbstractSegment implements Oversha
);
}
- public ReferenceCountingSegment(
+ public static ReferenceCountingSegment wrapSegment(
+ Segment baseSegment,
+ ShardSpec shardSpec
+ )
+ {
+ return new ReferenceCountingSegment(
+ baseSegment,
+ shardSpec.getStartRootPartitionId(),
+ shardSpec.getEndRootPartitionId(),
+ shardSpec.getMinorVersion(),
+ shardSpec.getAtomicUpdateGroupSize()
+ );
+ }
+
+ private ReferenceCountingSegment(
Segment baseSegment,
int startRootPartitionId,
int endRootPartitionId,
diff --git a/processing/src/test/java/org/apache/druid/query/select/MultiSegmentSelectQueryTest.java b/processing/src/test/java/org/apache/druid/query/select/MultiSegmentSelectQueryTest.java
index 216de77..07cc52f 100644
--- a/processing/src/test/java/org/apache/druid/query/select/MultiSegmentSelectQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/select/MultiSegmentSelectQueryTest.java
@@ -148,9 +148,21 @@ public class MultiSegmentSelectQueryTest
segment_override = new IncrementalIndexSegment(index2, makeIdentifier(index2, "v2"));
VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = new VersionedIntervalTimeline<>(StringComparators.LEXICOGRAPHIC);
- timeline.add(index0.getInterval(), "v1", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment0)));
- timeline.add(index1.getInterval(), "v1", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment1)));
- timeline.add(index2.getInterval(), "v2", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment_override)));
+ timeline.add(
+ index0.getInterval(),
+ "v1",
+ new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment0))
+ );
+ timeline.add(
+ index1.getInterval(),
+ "v1",
+ new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment1))
+ );
+ timeline.add(
+ index2.getInterval(),
+ "v2",
+ new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment_override))
+ );
segmentIdentifiers = new ArrayList<>();
for (TimelineObjectHolder<String, ?> holder : timeline.lookup(Intervals.of("2011-01-12/2011-01-14"))) {
diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
index 325a477..e7b71e6 100644
--- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
@@ -145,8 +145,16 @@ public class TimeBoundaryQueryRunnerTest
segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1, "v1"));
VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = new VersionedIntervalTimeline<>(StringComparators.LEXICOGRAPHIC);
- timeline.add(index0.getInterval(), "v1", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment0)));
- timeline.add(index1.getInterval(), "v1", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment1)));
+ timeline.add(
+ index0.getInterval(),
+ "v1",
+ new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment0))
+ );
+ timeline.add(
+ index1.getInterval(),
+ "v1",
+ new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment1))
+ );
return QueryRunnerTestHelper.makeFilteringQueryRunner(timeline, factory);
}
diff --git a/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java b/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java
index 5a5ae7c..cee0a46 100644
--- a/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java
@@ -41,7 +41,7 @@ public class ReferenceCountingSegmentTest
@Before
public void setUp()
{
- segment = new ReferenceCountingSegment(
+ segment = ReferenceCountingSegment.wrapRootGenerationSegment(
new AbstractSegment()
{
@Override
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
index 9be6d80..097d412 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
@@ -43,14 +43,16 @@ public class FireHydrant
public FireHydrant(IncrementalIndex index, int count, SegmentId segmentId)
{
this.index = index;
- this.adapter = new AtomicReference<>(new ReferenceCountingSegment(new IncrementalIndexSegment(index, segmentId)));
+ this.adapter = new AtomicReference<>(
+ ReferenceCountingSegment.wrapRootGenerationSegment(new IncrementalIndexSegment(index, segmentId))
+ );
this.count = count;
}
public FireHydrant(Segment adapter, int count)
{
this.index = null;
- this.adapter = new AtomicReference<>(new ReferenceCountingSegment(adapter));
+ this.adapter = new AtomicReference<>(ReferenceCountingSegment.wrapRootGenerationSegment(adapter));
this.count = count;
}
@@ -120,7 +122,7 @@ public class FireHydrant
throw new ISE("Cannot swap to the same segment");
}
ReferenceCountingSegment newReferenceCountingSegment =
- newSegment != null ? new ReferenceCountingSegment(newSegment) : null;
+ newSegment != null ? ReferenceCountingSegment.wrapRootGenerationSegment(newSegment) : null;
if (adapter.compareAndSet(currentSegment, newReferenceCountingSegment)) {
if (currentSegment != null) {
currentSegment.close();
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index 73a9de1..04c84aa 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -172,11 +172,12 @@ public class SegmentManager
log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId());
resultSupplier.set(false);
} else {
- final ReferenceCountingSegment referenceCountingSegment = new ReferenceCountingSegment(adapter);
loadedIntervals.add(
segment.getInterval(),
segment.getVersion(),
- segment.getShardSpec().createChunk(referenceCountingSegment)
+ segment.getShardSpec().createChunk(
+ ReferenceCountingSegment.wrapSegment(adapter, segment.getShardSpec())
+ )
);
dataSourceState.addSegment(segment);
resultSupplier.set(true);
@@ -226,15 +227,7 @@ public class SegmentManager
segment.getVersion(),
// remove() internally searches for a partitionChunk to remove which is *equal* to the given
// partitionChunk. Note that partitionChunk.equals() checks only the partitionNum, but not the object.
- segment.getShardSpec().createChunk(
- new ReferenceCountingSegment(
- null,
- shardSpec.getStartRootPartitionId(),
- shardSpec.getEndRootPartitionId(),
- shardSpec.getMinorVersion(),
- shardSpec.getAtomicUpdateGroupSize()
- )
- )
+ segment.getShardSpec().createChunk(ReferenceCountingSegment.wrapSegment(null, shardSpec))
);
final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index 14cbdbd..465bb73 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -36,6 +36,8 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
+import org.apache.druid.timeline.partition.PartitionIds;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
@@ -383,6 +385,34 @@ public class SegmentManagerTest
Assert.assertNull(segmentManager.getTimeline("nonExisting"));
}
+ @Test
+ public void testLoadAndDropNonRootGenerationSegment() throws SegmentLoadingException
+ {
+ final DataSegment segment = new DataSegment(
+ "small_source",
+ Intervals.of("0/1000"),
+ "0",
+ ImmutableMap.of("interval", Intervals.of("0/1000"), "version", 0),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ new NumberedOverwriteShardSpec(
+ PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + 10,
+ 10,
+ 20,
+ (short) 1,
+ (short) 1
+ ),
+ 0,
+ 10
+ );
+
+ segmentManager.loadSegment(segment);
+ assertResult(ImmutableList.of(segment));
+
+ segmentManager.dropSegment(segment);
+ assertResult(ImmutableList.of());
+ }
+
@SuppressWarnings("RedundantThrows") // TODO remove when the bug in intelliJ is fixed.
private void assertResult(List<DataSegment> expectedExistingSegments) throws SegmentLoadingException
{
@@ -403,7 +433,9 @@ public class SegmentManagerTest
expectedTimeline.add(
segment.getInterval(),
segment.getVersion(),
- segment.getShardSpec().createChunk(new ReferenceCountingSegment(segmentLoader.getSegment(segment)))
+ segment.getShardSpec().createChunk(
+ ReferenceCountingSegment.wrapSegment(segmentLoader.getSegment(segment), segment.getShardSpec())
+ )
);
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index 30858d6..155dc41 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -84,7 +84,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
timeline.add(
descriptor.getInterval(),
descriptor.getVersion(),
- descriptor.getShardSpec().createChunk(new ReferenceCountingSegment(segment))
+ descriptor.getShardSpec().createChunk(ReferenceCountingSegment.wrapSegment(segment, descriptor.getShardSpec()))
);
segments.add(descriptor);
closeables.add(index);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org