You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2020/09/03 21:39:49 UTC

[GitHub] [geode] DonalEvans commented on a change in pull request #5496: GEODE-8465: secondary HARegionQueue to sync with primary queue

DonalEvans commented on a change in pull request #5496:
URL: https://github.com/apache/geode/pull/5496#discussion_r483156232



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {

Review comment:
       Is it possible that when we get to this point, `synchronizeWithPrimaryInProgress` is true? Do we want to check its value before continuing, in case another thread has initiated the synchronization already?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {
+      return;
+    }
+    synchronizeWithPrimaryInProgress.set(true);
+    int maxChunkSize = 1000;
+
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("hasSynchronizedWithPrimary has synced with primary {} on queue: {} ", primary,
+            regionName);
+      }
+      hasSynchronizedWithPrimary.set(true);
+    } finally {
+      synchronizeWithPrimaryInProgress.set(false);
+    }
+  }
+
+  Collection<List<EventID>> getChunks(List<EventID> events, int size) {
+    AtomicInteger counter = new AtomicInteger(0);
+    return events.stream().collect(Collectors.groupingBy(event -> counter.getAndIncrement() / size))
+        .values();
+  }
+
+  boolean removeDispatchedEvents(InternalDistributedMember primary, InternalCache cache,
+      List<EventID> chunkEvents) {
+    List<EventID> dispatchedEvents = getDispatchedEventsFromPrimary(primary, cache, chunkEvents);
+
+    if (dispatchedEvents == null) {
+      // failed to get events from current primary, need to retry.
+      return false;
+    }
+
+    for (EventID id : dispatchedEvents) {
+      if (!removeDispatchedEventAfterSyncWithPrimary(id)) {
+        // failed to remove all dispatched events, need to retry
+        return false;
+      }
+    }
+    return true;
+  }
+
+  List<EventID> getDispatchedEventsFromPrimary(InternalDistributedMember primary,
+      InternalCache cache, List<EventID> chunkEvents) {
+    return QueueSynchronizationProcessor.getDispatchedEvents(cache.getDistributionManager(),
+        primary, regionName, chunkEvents);
+  }
+
+  List<EventID> getGIIEvents() {
+    List<EventID> events = new LinkedList<>();
+    for (long i = positionBeforeGII; i < positionAfterGII + 1; i++) {
+      Map.Entry entry = region.getEntry(i);

Review comment:
       The IDE warning on this line can be removed by making this `Map.Entry<?, ?> entry`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {
+      return;
+    }
+    synchronizeWithPrimaryInProgress.set(true);
+    int maxChunkSize = 1000;
+
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("hasSynchronizedWithPrimary has synced with primary {} on queue: {} ", primary,

Review comment:
       For consistency with other suggested log output changes, this might be better as `logger.debug("HARegionQueue {} has synced with primary on {}", regionName, primary);`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {
+      return;
+    }
+    synchronizeWithPrimaryInProgress.set(true);
+    int maxChunkSize = 1000;
+
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("hasSynchronizedWithPrimary has synced with primary {} on queue: {} ", primary,
+            regionName);
+      }
+      hasSynchronizedWithPrimary.set(true);
+    } finally {
+      synchronizeWithPrimaryInProgress.set(false);
+    }
+  }
+
+  Collection<List<EventID>> getChunks(List<EventID> events, int size) {
+    AtomicInteger counter = new AtomicInteger(0);
+    return events.stream().collect(Collectors.groupingBy(event -> counter.getAndIncrement() / size))
+        .values();
+  }
+
+  boolean removeDispatchedEvents(InternalDistributedMember primary, InternalCache cache,
+      List<EventID> chunkEvents) {
+    List<EventID> dispatchedEvents = getDispatchedEventsFromPrimary(primary, cache, chunkEvents);
+
+    if (dispatchedEvents == null) {
+      // failed to get events from current primary, need to retry.
+      return false;
+    }
+
+    for (EventID id : dispatchedEvents) {
+      if (!removeDispatchedEventAfterSyncWithPrimary(id)) {
+        // failed to remove all dispatched events, need to retry
+        return false;
+      }
+    }
+    return true;
+  }
+
+  List<EventID> getDispatchedEventsFromPrimary(InternalDistributedMember primary,
+      InternalCache cache, List<EventID> chunkEvents) {
+    return QueueSynchronizationProcessor.getDispatchedEvents(cache.getDistributionManager(),
+        primary, regionName, chunkEvents);
+  }
+
+  List<EventID> getGIIEvents() {
+    List<EventID> events = new LinkedList<>();
+    for (long i = positionBeforeGII; i < positionAfterGII + 1; i++) {
+      Map.Entry entry = region.getEntry(i);
+      // could be already removed after processing QueueRemovalMessage
+      if (entry != null && entry.getValue() instanceof HAEventWrapper) {
+        HAEventWrapper wrapper = uncheckedCast(entry.getValue());
+        events.add(wrapper.getEventId());
+      }
+    }
+    return events;
+  }
+
+  boolean removeDispatchedEventAfterSyncWithPrimary(EventID id) {
+    boolean interrupted = Thread.interrupted();
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("removing dispatched event after sync with primary on queue {} for {}",
+            regionName, id);
+      }
+      removeDispatchedEvents(id);
+    } catch (RegionDestroyedException ignore) {
+      logger.info(
+          "Queue found destroyed while processing dispatched sequence ID after syn."
+              + " The event ID is {} for HARegion with name={}",
+          id, regionName);

Review comment:
       This might be better as `logger.info("HARegionQueue {} was found to be destroyed when attempting to remove dispatched event with ID {} after sync", regionName, id);`

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;
+    addEvents();
+    doReturn(false).when(spy).isRemoved(id1);
+    doReturn(true).when(spy).isRemoved(id2);
+    doReturn(true).when(spy).isRemoved(id3);
+    doReturn(false).when(spy).isRemoved(id4);
+
+    removedEvents = spy.getDispatchedOrRemovedEvents(eventIDS);
+
+    assertThat(removedEvents.size()).isEqualTo(2);
+    assertThat(removedEvents.contains(id2));
+    assertThat(removedEvents.contains(id3));
+  }
+
+  private void addEvents() {
+    eventIDS.add(id1);
+    eventIDS.add(id2);
+    eventIDS.add(id3);
+    eventIDS.add(id4);
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(null).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsFalseIfSequenceIdGreaterThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 99L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isFalse();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdEqualsLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdLessThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(90L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfHasDoneSynchronization() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.hasSynchronizedWithPrimary.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfSynchronizationIsInProgress() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.synchronizeWithPrimaryInProgress.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfGIINotFinished() {
+    HARegionQueue spy = spy(haRegionQueue);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfPrimaryHasOlderThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) (KnownVersion.GEODE_1_14_0.ordinal() - 1));
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void runSynchronizationWithPrimaryIfPrimaryIsGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void runSynchronizationWithPrimaryIfPrimaryIsLaterThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) (KnownVersion.GEODE_1_14_0.ordinal() + 1));
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void getGIIEventsReturnsCorrectEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> giiEvents;
+    spy.positionBeforeGII = 1;
+    spy.positionAfterGII = 4;
+    HAEventWrapper wrapper1 = mock(HAEventWrapper.class);
+    HAEventWrapper wrapper2 = mock(HAEventWrapper.class);
+    when(wrapper1.getEventId()).thenReturn(id1);
+    when(wrapper2.getEventId()).thenReturn(id2);
+    Region.Entry entry1 = mock(Region.Entry.class);
+    Region.Entry entry2 = mock(Region.Entry.class);
+    Region.Entry entry3 = mock(Region.Entry.class);

Review comment:
       The IDE warnings for these lines can be removed by using 
   `Region.Entry<Object,Object> entry1 = uncheckedCast(mock(Region.Entry.class));`
   `Region.Entry<Object,Object> entry2 = uncheckedCast(mock(Region.Entry.class));`
   `Region.Entry<Object,Object> entry3 = uncheckedCast(mock(Region.Entry.class));`

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;
+    addEvents();
+    doReturn(false).when(spy).isRemoved(id1);
+    doReturn(true).when(spy).isRemoved(id2);
+    doReturn(true).when(spy).isRemoved(id3);
+    doReturn(false).when(spy).isRemoved(id4);
+
+    removedEvents = spy.getDispatchedOrRemovedEvents(eventIDS);
+
+    assertThat(removedEvents.size()).isEqualTo(2);
+    assertThat(removedEvents.contains(id2));
+    assertThat(removedEvents.contains(id3));
+  }
+
+  private void addEvents() {
+    eventIDS.add(id1);
+    eventIDS.add(id2);
+    eventIDS.add(id3);
+    eventIDS.add(id4);
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(null).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsFalseIfSequenceIdGreaterThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 99L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isFalse();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdEqualsLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdLessThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(90L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfHasDoneSynchronization() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.hasSynchronizedWithPrimary.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfSynchronizationIsInProgress() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.synchronizeWithPrimaryInProgress.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfGIINotFinished() {
+    HARegionQueue spy = spy(haRegionQueue);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfPrimaryHasOlderThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) (KnownVersion.GEODE_1_14_0.ordinal() - 1));
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void runSynchronizationWithPrimaryIfPrimaryIsGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void runSynchronizationWithPrimaryIfPrimaryIsLaterThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) (KnownVersion.GEODE_1_14_0.ordinal() + 1));
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void getGIIEventsReturnsCorrectEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> giiEvents;
+    spy.positionBeforeGII = 1;
+    spy.positionAfterGII = 4;
+    HAEventWrapper wrapper1 = mock(HAEventWrapper.class);
+    HAEventWrapper wrapper2 = mock(HAEventWrapper.class);
+    when(wrapper1.getEventId()).thenReturn(id1);
+    when(wrapper2.getEventId()).thenReturn(id2);
+    Region.Entry entry1 = mock(Region.Entry.class);
+    Region.Entry entry2 = mock(Region.Entry.class);
+    Region.Entry entry3 = mock(Region.Entry.class);
+    when(entry1.getValue()).thenReturn(wrapper1);
+    when(entry3.getValue()).thenReturn(wrapper2);
+    when(entry2.getValue()).thenReturn(id3);
+
+    when(haRegion.getEntry(1L)).thenReturn(entry1);
+    when(haRegion.getEntry(2L)).thenReturn(entry2);
+    when(haRegion.getEntry(3L)).thenReturn(null);
+    when(haRegion.getEntry(4L)).thenReturn(entry3);
+
+    giiEvents = spy.getGIIEvents();
+
+    assertThat(giiEvents.size()).isEqualTo(2);
+    assertThat(giiEvents.contains(id1));
+    assertThat(giiEvents.contains(id2));
+  }
+
+  @Test
+  public void doSynchronizationWithPrimaryReturnsIfHasDoneSynchronization() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.hasSynchronizedWithPrimary.set(true);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+    verify(spy, never()).getGIIEvents();
+  }
+
+  @Test
+  public void doSynchronizationWithPrimaryReturnsIfNoGIIEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    spy.hasSynchronizedWithPrimary.set(true);
+    doReturn(new LinkedList<EventID>()).when(spy).getGIIEvents();
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy, never()).getChunks(eventIDS, maxChunkSize);
+    verify(spy, never()).removeDispatchedEvents(primary, internalCache, eventIDS);
+    assertThat(spy.hasSynchronizedWithPrimary).isTrue();
+    assertThat(spy.synchronizeWithPrimaryInProgress).isFalse();
+  }
+
+  @Test
+  public void doSynchronizationWithPrimaryRemoveDispatchedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    addEvents();
+    doReturn(eventIDS).when(spy).getGIIEvents();
+    doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, eventIDS);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy, never()).getChunks(eventIDS, maxChunkSize);
+    verify(spy).removeDispatchedEvents(primary, internalCache, eventIDS);
+    assertThat(spy.hasSynchronizedWithPrimary).isTrue();
+    assertThat(spy.synchronizeWithPrimaryInProgress).isFalse();
+  }
+
+  @Test
+  public void hasSynchronizedWithPrimaryNotSetIfRemoveDispatchedEventsFails() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    addEvents();
+    doReturn(eventIDS).when(spy).getGIIEvents();
+    doReturn(false).when(spy).removeDispatchedEvents(primary, internalCache, eventIDS);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy, never()).getChunks(eventIDS, maxChunkSize);
+    verify(spy).removeDispatchedEvents(primary, internalCache, eventIDS);
+    assertThat(spy.hasSynchronizedWithPrimary).isFalse();
+    assertThat(spy.synchronizeWithPrimaryInProgress).isFalse();
+  }
+
+  @Test
+  public void hasSynchronizedWithPrimaryRemoveChunksIfManyGIIEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    for (int i = 0; i < 1100; i++) {
+      eventIDS.add(mock(EventID.class));
+    }
+    createChunks();
+    doReturn(eventIDS).when(spy).getGIIEvents();
+    doReturn(chunks).when(spy).getChunks(eventIDS, maxChunkSize);
+    doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, chunk1);
+    doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, chunk2);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy).getChunks(eventIDS, maxChunkSize);
+    verify(spy).removeDispatchedEvents(primary, internalCache, chunk1);
+    verify(spy).removeDispatchedEvents(primary, internalCache, chunk2);
+    assertThat(spy.hasSynchronizedWithPrimary).isTrue();
+    assertThat(spy.synchronizeWithPrimaryInProgress).isFalse();
+  }
+
+  private void createChunks() {
+    chunk1.add(id1);
+    chunk2.add(id2);
+    chunks.add(chunk1);
+    chunks.add(chunk2);
+  }
+
+  @Test
+  public void hasSynchronizedWithPrimaryNotSetIfRemoveChunksFails() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    for (int i = 0; i < 1100; i++) {
+      eventIDS.add(mock(EventID.class));
+    }
+    createChunks();
+    doReturn(eventIDS).when(spy).getGIIEvents();
+    doReturn(chunks).when(spy).getChunks(eventIDS, maxChunkSize);
+    doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, chunk1);
+    doReturn(false).when(spy).removeDispatchedEvents(primary, internalCache, chunk2);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy).getChunks(eventIDS, maxChunkSize);
+    verify(spy).removeDispatchedEvents(primary, internalCache, chunk1);
+    verify(spy).removeDispatchedEvents(primary, internalCache, chunk2);
+    assertThat(spy.hasSynchronizedWithPrimary).isFalse();
+    assertThat(spy.synchronizeWithPrimaryInProgress).isFalse();
+  }
+
+  @Test
+  public void getChunksReturnsEqualSizedChunks() {
+    HARegionQueue spy = spy(haRegionQueue);
+    addEvents();
+    // add more events
+    eventIDS.add(mock(EventID.class));
+    eventIDS.add(mock(EventID.class));
+
+    Collection<List<EventID>> myChunks = spy.getChunks(eventIDS, 2);
+
+    assertThat(myChunks.size()).isEqualTo(3);

Review comment:
       Could this 3 be replaced with a calculated value, and the 2 for max chunk size extracted to a variable? I think that `eventIDS.size()/maxChunkSize` should be the correct value.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {
+      return;
+    }
+    synchronizeWithPrimaryInProgress.set(true);
+    int maxChunkSize = 1000;
+
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("hasSynchronizedWithPrimary has synced with primary {} on queue: {} ", primary,
+            regionName);
+      }
+      hasSynchronizedWithPrimary.set(true);
+    } finally {
+      synchronizeWithPrimaryInProgress.set(false);
+    }
+  }
+
+  Collection<List<EventID>> getChunks(List<EventID> events, int size) {
+    AtomicInteger counter = new AtomicInteger(0);
+    return events.stream().collect(Collectors.groupingBy(event -> counter.getAndIncrement() / size))
+        .values();
+  }
+
+  boolean removeDispatchedEvents(InternalDistributedMember primary, InternalCache cache,
+      List<EventID> chunkEvents) {
+    List<EventID> dispatchedEvents = getDispatchedEventsFromPrimary(primary, cache, chunkEvents);
+
+    if (dispatchedEvents == null) {
+      // failed to get events from current primary, need to retry.
+      return false;
+    }
+
+    for (EventID id : dispatchedEvents) {
+      if (!removeDispatchedEventAfterSyncWithPrimary(id)) {
+        // failed to remove all dispatched events, need to retry
+        return false;
+      }
+    }
+    return true;
+  }
+
+  List<EventID> getDispatchedEventsFromPrimary(InternalDistributedMember primary,
+      InternalCache cache, List<EventID> chunkEvents) {
+    return QueueSynchronizationProcessor.getDispatchedEvents(cache.getDistributionManager(),
+        primary, regionName, chunkEvents);
+  }
+
+  List<EventID> getGIIEvents() {
+    List<EventID> events = new LinkedList<>();
+    for (long i = positionBeforeGII; i < positionAfterGII + 1; i++) {
+      Map.Entry entry = region.getEntry(i);
+      // could be already removed after processing QueueRemovalMessage
+      if (entry != null && entry.getValue() instanceof HAEventWrapper) {
+        HAEventWrapper wrapper = uncheckedCast(entry.getValue());

Review comment:
       The use of the `uncheckedCast()` method here is not necessary, since the `instanceof` check ensures that the result of `entry.getValue()` can be safely cast to `HAEventWrapper`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {
+      return;
+    }
+    synchronizeWithPrimaryInProgress.set(true);
+    int maxChunkSize = 1000;
+
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("hasSynchronizedWithPrimary has synced with primary {} on queue: {} ", primary,
+            regionName);
+      }
+      hasSynchronizedWithPrimary.set(true);
+    } finally {
+      synchronizeWithPrimaryInProgress.set(false);
+    }
+  }
+
+  Collection<List<EventID>> getChunks(List<EventID> events, int size) {
+    AtomicInteger counter = new AtomicInteger(0);
+    return events.stream().collect(Collectors.groupingBy(event -> counter.getAndIncrement() / size))
+        .values();
+  }
+
+  boolean removeDispatchedEvents(InternalDistributedMember primary, InternalCache cache,
+      List<EventID> chunkEvents) {
+    List<EventID> dispatchedEvents = getDispatchedEventsFromPrimary(primary, cache, chunkEvents);
+
+    if (dispatchedEvents == null) {
+      // failed to get events from current primary, need to retry.
+      return false;
+    }
+
+    for (EventID id : dispatchedEvents) {
+      if (!removeDispatchedEventAfterSyncWithPrimary(id)) {
+        // failed to remove all dispatched events, need to retry
+        return false;
+      }
+    }
+    return true;
+  }
+
+  List<EventID> getDispatchedEventsFromPrimary(InternalDistributedMember primary,
+      InternalCache cache, List<EventID> chunkEvents) {
+    return QueueSynchronizationProcessor.getDispatchedEvents(cache.getDistributionManager(),
+        primary, regionName, chunkEvents);
+  }
+
+  List<EventID> getGIIEvents() {
+    List<EventID> events = new LinkedList<>();
+    for (long i = positionBeforeGII; i < positionAfterGII + 1; i++) {
+      Map.Entry entry = region.getEntry(i);
+      // could be already removed after processing QueueRemovalMessage
+      if (entry != null && entry.getValue() instanceof HAEventWrapper) {
+        HAEventWrapper wrapper = uncheckedCast(entry.getValue());
+        events.add(wrapper.getEventId());
+      }
+    }
+    return events;
+  }
+
+  boolean removeDispatchedEventAfterSyncWithPrimary(EventID id) {
+    boolean interrupted = Thread.interrupted();
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("removing dispatched event after sync with primary on queue {} for {}",

Review comment:
       This might be better worded as "After sync with primary for HARegionQueue {}, removing dispatched event with ID {}"

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {

Review comment:
       The name of this method (and the `isRemoved()` method below it) are a little confusing to me. It appears that we only check if events have been dispatched by the primary, and the naming up to this point has been consistently only talking about dispatched events. Would it make sense for these methods be renamed to `getDispatchedEvents()` and `isDispatched()`, for clarity?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {
+      return;
+    }
+    synchronizeWithPrimaryInProgress.set(true);
+    int maxChunkSize = 1000;
+
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("hasSynchronizedWithPrimary has synced with primary {} on queue: {} ", primary,
+            regionName);
+      }
+      hasSynchronizedWithPrimary.set(true);
+    } finally {
+      synchronizeWithPrimaryInProgress.set(false);
+    }
+  }
+
+  Collection<List<EventID>> getChunks(List<EventID> events, int size) {
+    AtomicInteger counter = new AtomicInteger(0);
+    return events.stream().collect(Collectors.groupingBy(event -> counter.getAndIncrement() / size))
+        .values();
+  }
+
+  boolean removeDispatchedEvents(InternalDistributedMember primary, InternalCache cache,
+      List<EventID> chunkEvents) {
+    List<EventID> dispatchedEvents = getDispatchedEventsFromPrimary(primary, cache, chunkEvents);
+
+    if (dispatchedEvents == null) {
+      // failed to get events from current primary, need to retry.
+      return false;
+    }
+
+    for (EventID id : dispatchedEvents) {
+      if (!removeDispatchedEventAfterSyncWithPrimary(id)) {
+        // failed to remove all dispatched events, need to retry
+        return false;
+      }
+    }
+    return true;
+  }
+
+  List<EventID> getDispatchedEventsFromPrimary(InternalDistributedMember primary,
+      InternalCache cache, List<EventID> chunkEvents) {
+    return QueueSynchronizationProcessor.getDispatchedEvents(cache.getDistributionManager(),
+        primary, regionName, chunkEvents);
+  }
+
+  List<EventID> getGIIEvents() {
+    List<EventID> events = new LinkedList<>();
+    for (long i = positionBeforeGII; i < positionAfterGII + 1; i++) {
+      Map.Entry entry = region.getEntry(i);
+      // could be already removed after processing QueueRemovalMessage
+      if (entry != null && entry.getValue() instanceof HAEventWrapper) {
+        HAEventWrapper wrapper = uncheckedCast(entry.getValue());
+        events.add(wrapper.getEventId());
+      }
+    }
+    return events;
+  }
+
+  boolean removeDispatchedEventAfterSyncWithPrimary(EventID id) {
+    boolean interrupted = Thread.interrupted();
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("removing dispatched event after sync with primary on queue {} for {}",
+            regionName, id);
+      }
+      removeDispatchedEvents(id);
+    } catch (RegionDestroyedException ignore) {
+      logger.info(
+          "Queue found destroyed while processing dispatched sequence ID after syn."
+              + " The event ID is {} for HARegion with name={}",
+          id, regionName);
+    } catch (CancelException ignore) {
+      return false;
+    } catch (CacheException e) {
+      logger.error(String.format(
+          "Sync with primary got Exception when removing from the queue. The problem is with event ID, %s for HARegion with name=%s",
+          regionName, id),

Review comment:
       It seems like the arguments for the string format are in the wrong order here. Alternately, this might be better worded as "HARegionQueue %s encountered an exception when attempting to remove event with ID %s from the queue", which would have the arguments in the correct order.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
##########
@@ -212,4 +213,24 @@ public void removeQueueEventReturnsTrueIfRemovalThrowsRejectedExecutionException
 
     assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isTrue();
   }
+
+  @Test
+  public void synchronizeQueueWithPrimaryInvokedAfterProcessEachRegionQueue() {
+    addToMessagesList();
+    Iterator iterator = messagesList.iterator();

Review comment:
       The IDE warning for raw use of parameterized class can be removed by using `Iterator<Object>` here.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;
+    addEvents();
+    doReturn(false).when(spy).isRemoved(id1);
+    doReturn(true).when(spy).isRemoved(id2);
+    doReturn(true).when(spy).isRemoved(id3);
+    doReturn(false).when(spy).isRemoved(id4);
+
+    removedEvents = spy.getDispatchedOrRemovedEvents(eventIDS);
+
+    assertThat(removedEvents.size()).isEqualTo(2);
+    assertThat(removedEvents.contains(id2));
+    assertThat(removedEvents.contains(id3));

Review comment:
       These three assertions can be simplified to `assertThat(removedEvents).containsExactlyInAnyOrder(id2, id3);`

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;
+    addEvents();
+    doReturn(false).when(spy).isRemoved(id1);
+    doReturn(true).when(spy).isRemoved(id2);
+    doReturn(true).when(spy).isRemoved(id3);
+    doReturn(false).when(spy).isRemoved(id4);
+
+    removedEvents = spy.getDispatchedOrRemovedEvents(eventIDS);
+
+    assertThat(removedEvents.size()).isEqualTo(2);
+    assertThat(removedEvents.contains(id2));
+    assertThat(removedEvents.contains(id3));
+  }
+
+  private void addEvents() {
+    eventIDS.add(id1);
+    eventIDS.add(id2);
+    eventIDS.add(id3);
+    eventIDS.add(id4);
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {

Review comment:
       If the comments about the naming of `getDispatchedOrRemovedEvents()` and `isRemoved()` are applied, this method name should also be changed, along with the following 3 test methods.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;

Review comment:
       The declaration and assignment of this variable can be moved to the same line.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {

Review comment:
       If the comments about the naming of `getDispatchedOrRemovedEvents()` and `isRemoved()` are applied, this method name should also be changed.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;
+    addEvents();
+    doReturn(false).when(spy).isRemoved(id1);
+    doReturn(true).when(spy).isRemoved(id2);
+    doReturn(true).when(spy).isRemoved(id3);
+    doReturn(false).when(spy).isRemoved(id4);
+
+    removedEvents = spy.getDispatchedOrRemovedEvents(eventIDS);
+
+    assertThat(removedEvents.size()).isEqualTo(2);
+    assertThat(removedEvents.contains(id2));
+    assertThat(removedEvents.contains(id3));
+  }
+
+  private void addEvents() {
+    eventIDS.add(id1);
+    eventIDS.add(id2);
+    eventIDS.add(id3);
+    eventIDS.add(id4);
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(null).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsFalseIfSequenceIdGreaterThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 99L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isFalse();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdEqualsLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdLessThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(90L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfHasDoneSynchronization() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.hasSynchronizedWithPrimary.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfSynchronizationIsInProgress() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.synchronizeWithPrimaryInProgress.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfGIINotFinished() {
+    HARegionQueue spy = spy(haRegionQueue);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfPrimaryHasOlderThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) (KnownVersion.GEODE_1_14_0.ordinal() - 1));
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void runSynchronizationWithPrimaryIfPrimaryIsGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void runSynchronizationWithPrimaryIfPrimaryIsLaterThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) (KnownVersion.GEODE_1_14_0.ordinal() + 1));
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void getGIIEventsReturnsCorrectEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> giiEvents;
+    spy.positionBeforeGII = 1;
+    spy.positionAfterGII = 4;
+    HAEventWrapper wrapper1 = mock(HAEventWrapper.class);
+    HAEventWrapper wrapper2 = mock(HAEventWrapper.class);
+    when(wrapper1.getEventId()).thenReturn(id1);
+    when(wrapper2.getEventId()).thenReturn(id2);
+    Region.Entry entry1 = mock(Region.Entry.class);
+    Region.Entry entry2 = mock(Region.Entry.class);
+    Region.Entry entry3 = mock(Region.Entry.class);
+    when(entry1.getValue()).thenReturn(wrapper1);
+    when(entry3.getValue()).thenReturn(wrapper2);
+    when(entry2.getValue()).thenReturn(id3);

Review comment:
       The numbering of things is a little confusing here. Is there a reason that `entry3` returns `wrapper2`  and `entry2` returns `id3`?

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;
+    addEvents();
+    doReturn(false).when(spy).isRemoved(id1);
+    doReturn(true).when(spy).isRemoved(id2);
+    doReturn(true).when(spy).isRemoved(id3);
+    doReturn(false).when(spy).isRemoved(id4);
+
+    removedEvents = spy.getDispatchedOrRemovedEvents(eventIDS);
+
+    assertThat(removedEvents.size()).isEqualTo(2);
+    assertThat(removedEvents.contains(id2));
+    assertThat(removedEvents.contains(id3));
+  }
+
+  private void addEvents() {
+    eventIDS.add(id1);
+    eventIDS.add(id2);
+    eventIDS.add(id3);
+    eventIDS.add(id4);
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(null).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsFalseIfSequenceIdGreaterThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 99L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isFalse();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdEqualsLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdLessThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(90L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfHasDoneSynchronization() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.hasSynchronizedWithPrimary.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfSynchronizationIsInProgress() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.synchronizeWithPrimaryInProgress.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfGIINotFinished() {
+    HARegionQueue spy = spy(haRegionQueue);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfPrimaryHasOlderThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) (KnownVersion.GEODE_1_14_0.ordinal() - 1));
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void runSynchronizationWithPrimaryIfPrimaryIsGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void runSynchronizationWithPrimaryIfPrimaryIsLaterThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) (KnownVersion.GEODE_1_14_0.ordinal() + 1));
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void getGIIEventsReturnsCorrectEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> giiEvents;
+    spy.positionBeforeGII = 1;
+    spy.positionAfterGII = 4;
+    HAEventWrapper wrapper1 = mock(HAEventWrapper.class);
+    HAEventWrapper wrapper2 = mock(HAEventWrapper.class);
+    when(wrapper1.getEventId()).thenReturn(id1);
+    when(wrapper2.getEventId()).thenReturn(id2);
+    Region.Entry entry1 = mock(Region.Entry.class);
+    Region.Entry entry2 = mock(Region.Entry.class);
+    Region.Entry entry3 = mock(Region.Entry.class);
+    when(entry1.getValue()).thenReturn(wrapper1);
+    when(entry3.getValue()).thenReturn(wrapper2);
+    when(entry2.getValue()).thenReturn(id3);
+
+    when(haRegion.getEntry(1L)).thenReturn(entry1);
+    when(haRegion.getEntry(2L)).thenReturn(entry2);
+    when(haRegion.getEntry(3L)).thenReturn(null);
+    when(haRegion.getEntry(4L)).thenReturn(entry3);
+
+    giiEvents = spy.getGIIEvents();
+
+    assertThat(giiEvents.size()).isEqualTo(2);
+    assertThat(giiEvents.contains(id1));
+    assertThat(giiEvents.contains(id2));

Review comment:
       These three assertions can be simplified to `assertThat(giiEvents).containsExactlyInAnyOrder(id1, id2);`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org